From 54cfde022a402a9ef25251e226daaa0500305648 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Wed, 10 Jun 2026 19:25:59 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(resource):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=A3=9E=E4=B9=A6=E7=94=A8=E6=88=B7=20token=20=E5=AF=BC?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/ov_cli/src/client.rs | 8 +- crates/ov_cli/src/commands/resources.rs | 3 + crates/ov_cli/src/handlers.rs | 159 ++++++++++++++++ crates/ov_cli/src/main.rs | 25 +++ docs/en/api/02-resources.md | 25 ++- docs/en/guides/06-mcp-integration.md | 4 +- docs/zh/api/02-resources.md | 25 ++- docs/zh/guides/06-mcp-integration.md | 4 +- openviking/async_client.py | 2 + openviking/client/local.py | 2 + openviking/parse/accessors/feishu_accessor.py | 90 +++++++-- openviking/server/mcp_endpoint.py | 7 +- openviking/server/routers/resources.py | 6 +- openviking/service/resource_service.py | 71 +++++++ openviking/sync_client.py | 2 + openviking_cli/client/base.py | 1 + openviking_cli/client/http.py | 2 + openviking_cli/client/sync_http.py | 2 + tests/client/test_http_client_local_upload.py | 20 ++ tests/parse/test_feishu_accessor.py | 174 ++++++++++++++++++ tests/server/test_api_resources.py | 28 +++ tests/service/test_resource_service_watch.py | 68 ++++++- 22 files changed, 702 insertions(+), 26 deletions(-) create mode 100644 tests/parse/test_feishu_accessor.py diff --git a/crates/ov_cli/src/client.rs b/crates/ov_cli/src/client.rs index 6f228e68fa..5d66378ece 100644 --- a/crates/ov_cli/src/client.rs +++ b/crates/ov_cli/src/client.rs @@ -1,5 +1,5 @@ use serde::de::DeserializeOwned; -use serde_json::Value; +use serde_json::{Map, Value}; use std::env; use std::path::Path; @@ -482,10 +482,12 @@ impl HttpClient { exclude: Option, directly_upload_media: bool, watch_interval: f64, + resource_args: Option>, show_progress: bool, verbose: bool, ) -> Result { let path_obj = Path::new(path); + let args = Value::Object(resource_args.unwrap_or_default()); // Determine effective parent and create_parent flag. // Only send create_parent when the user explicitly selected @@ -541,6 +543,7 @@ impl HttpClient { "exclude": exclude, "directly_upload_media": directly_upload_media, "watch_interval": watch_interval, + "args": args.clone(), })); let dynamic_timeout = @@ -575,6 +578,7 @@ impl HttpClient { "exclude": exclude, "directly_upload_media": directly_upload_media, "watch_interval": watch_interval, + "args": args.clone(), })); let dynamic_timeout = @@ -597,6 +601,7 @@ impl HttpClient { "exclude": exclude, "directly_upload_media": directly_upload_media, "watch_interval": watch_interval, + "args": args.clone(), })); self.post("/api/v1/resources", &body).await @@ -616,6 +621,7 @@ impl HttpClient { "exclude": exclude, "directly_upload_media": directly_upload_media, "watch_interval": watch_interval, + "args": args, })); self.post("/api/v1/resources", &body).await diff --git a/crates/ov_cli/src/commands/resources.rs b/crates/ov_cli/src/commands/resources.rs index ca9c404cdc..1900782548 100644 --- a/crates/ov_cli/src/commands/resources.rs +++ b/crates/ov_cli/src/commands/resources.rs @@ -1,6 +1,7 @@ use crate::client::HttpClient; use crate::error::Result; use crate::output::{OutputFormat, output_success}; +use serde_json::{Map, Value}; pub async fn add_resource( client: &HttpClient, @@ -18,6 +19,7 @@ pub async fn add_resource( exclude: Option, directly_upload_media: bool, watch_interval: f64, + resource_args: Option>, format: OutputFormat, compact: bool, show_progress: bool, @@ -39,6 +41,7 @@ pub async fn add_resource( exclude, directly_upload_media, watch_interval, + resource_args, show_progress, verbose, ) diff --git a/crates/ov_cli/src/handlers.rs b/crates/ov_cli/src/handlers.rs index f1e9d10390..67833946fa 100644 --- a/crates/ov_cli/src/handlers.rs +++ b/crates/ov_cli/src/handlers.rs @@ -8,6 +8,7 @@ use crate::error::{Error, Result}; use crate::theme; use crate::tui; use colored::Colorize; +use serde_json::{Map, Value}; pub async fn handle_add_resource( mut path: String, @@ -24,6 +25,7 @@ pub async fn handle_add_resource( exclude: Option, no_directly_upload_media: bool, watch_interval: f64, + resource_args: Option, ctx: CliContext, ) -> Result<()> { let is_url = @@ -68,6 +70,7 @@ pub async fn handle_add_resource( merge_csv_options(ctx.config.upload.ignore_dirs.clone(), ignore_dirs); let effective_include = merge_csv_options(ctx.config.upload.include.clone(), include); let effective_exclude = merge_csv_options(ctx.config.upload.exclude.clone(), exclude); + let add_resource_args = parse_add_resource_args(resource_args.as_deref())?; let effective_timeout = if wait { timeout.unwrap_or(60.0).max(ctx.config.timeout) @@ -100,6 +103,7 @@ pub async fn handle_add_resource( effective_exclude, directly_upload_media, watch_interval, + add_resource_args, ctx.output_format, ctx.compact, ctx.should_show_progress(), @@ -108,6 +112,123 @@ pub async fn handle_add_resource( .await } +fn parse_add_resource_args(raw: Option<&str>) -> Result>> { + let Some(raw) = raw.map(str::trim).filter(|raw| !raw.is_empty()) else { + return Ok(None); + }; + + if raw.starts_with('{') { + let value: Value = serde_json::from_str(raw) + .map_err(|e| Error::Client(format!("Invalid --args JSON object: {e}")))?; + return match value { + Value::Object(map) => Ok(Some(map)), + _ => Err(Error::Client( + "--args JSON form must be an object, e.g. '{\"feishu_access_token\":\"u-...\"}'" + .to_string(), + )), + }; + } + + let mut args = Map::new(); + for item in split_add_resource_args(raw)? { + let Some((key, value)) = item.split_once(':') else { + return Err(Error::Client(format!( + "Invalid --args item '{item}'. Expected key:value." + ))); + }; + let key = key.trim(); + if key.is_empty() { + return Err(Error::Client( + "Invalid --args item with empty key.".to_string(), + )); + } + args.insert(key.to_string(), parse_add_resource_arg_value(value.trim())); + } + Ok(Some(args)) +} + +fn split_add_resource_args(raw: &str) -> Result> { + let mut items = Vec::new(); + let mut current = String::new(); + let mut quote: Option = None; + let mut escape = false; + let mut depth = 0_i32; + + for ch in raw.chars() { + if escape { + current.push(ch); + escape = false; + continue; + } + if ch == '\\' { + current.push(ch); + escape = true; + continue; + } + if let Some(q) = quote { + current.push(ch); + if ch == q { + quote = None; + } + continue; + } + match ch { + '"' | '\'' => { + quote = Some(ch); + current.push(ch); + } + '{' | '[' => { + depth += 1; + current.push(ch); + } + '}' | ']' => { + depth -= 1; + if depth < 0 { + return Err(Error::Client("Invalid --args nesting.".to_string())); + } + current.push(ch); + } + ',' if depth == 0 => { + let item = current.trim(); + if !item.is_empty() { + items.push(item.to_string()); + } + current.clear(); + } + _ => current.push(ch), + } + } + + if quote.is_some() || depth != 0 { + return Err(Error::Client( + "Invalid --args quoting or nesting.".to_string(), + )); + } + let item = current.trim(); + if !item.is_empty() { + items.push(item.to_string()); + } + Ok(items) +} + +fn parse_add_resource_arg_value(raw: &str) -> Value { + if raw.is_empty() { + return Value::String(String::new()); + } + if let Ok(value) = serde_json::from_str::(raw) { + return value; + } + let unquoted = raw + .strip_prefix('"') + .and_then(|value| value.strip_suffix('"')) + .or_else(|| { + raw.strip_prefix('\'') + .and_then(|value| value.strip_suffix('\'')) + }) + .unwrap_or(raw); + Value::String(unquoted.to_string()) +} + pub async fn handle_add_skill( data: String, wait: bool, @@ -1432,6 +1553,44 @@ pub async fn handle_tui(uri: String, ctx: CliContext) -> Result<()> { tui::run_tui(client, &uri).await } +#[cfg(test)] +mod add_resource_args_tests { + use super::*; + + #[test] + fn parses_key_value_args() { + let args = parse_add_resource_args(Some("feishu_access_token:u-test,limit:3,deep:true")) + .expect("args should parse") + .expect("args should be present"); + + assert_eq!( + args.get("feishu_access_token"), + Some(&Value::String("u-test".to_string())) + ); + assert_eq!(args.get("limit"), Some(&serde_json::json!(3))); + assert_eq!(args.get("deep"), Some(&Value::Bool(true))); + } + + #[test] + fn parses_json_object_args() { + let args = parse_add_resource_args(Some(r#"{"feishu_access_token":"u-test"}"#)) + .expect("json object should parse") + .expect("args should be present"); + + assert_eq!( + args.get("feishu_access_token"), + Some(&Value::String("u-test".to_string())) + ); + } + + #[test] + fn rejects_invalid_args_item() { + let err = parse_add_resource_args(Some("feishu_access_token")).unwrap_err(); + + assert!(err.to_string().contains("Expected key:value")); + } +} + #[cfg(test)] mod config_switch_prompt_tests { use super::*; diff --git a/crates/ov_cli/src/main.rs b/crates/ov_cli/src/main.rs index 5ec672ca6e..e71335e8ee 100644 --- a/crates/ov_cli/src/main.rs +++ b/crates/ov_cli/src/main.rs @@ -261,6 +261,9 @@ enum Commands { /// Watch interval in minutes for automatic resource monitoring (0 = no monitoring) #[arg(long, default_value = "0")] watch_interval: f64, + /// Parser-specific import options, e.g. --args feishu_access_token:u-xxx + #[arg(long = "args")] + resource_args: Option, #[command(flatten)] upload_options: UploadCliOptions, }, @@ -2164,6 +2167,7 @@ async fn main() { exclude, no_directly_upload_media, watch_interval, + resource_args, upload_options, } => { let ctx = @@ -2183,6 +2187,7 @@ async fn main() { exclude, no_directly_upload_media, watch_interval, + resource_args, ctx, ) .await @@ -2981,6 +2986,7 @@ mod tests { assert!(help.contains("--progress")); assert!(help.contains("--no-progress")); assert!(help.contains("--verbose")); + assert!(help.contains("--args")); } #[test] @@ -3041,6 +3047,25 @@ mod tests { assert!(Cli::try_parse_from(["ov", "skills", "update", "--progress"]).is_err()); } + #[test] + fn cli_parses_add_resource_args() { + let cli = Cli::try_parse_from([ + "ov", + "add-resource", + "https://example.feishu.cn/docx/doc123", + "--args", + "feishu_access_token:u-test", + ]) + .expect("add-resource args should parse"); + + match cli.command { + Commands::AddResource { resource_args, .. } => { + assert_eq!(resource_args.as_deref(), Some("feishu_access_token:u-test")); + } + _ => panic!("expected add-resource command"), + } + } + #[test] fn cli_parses_skills_command_group() { let list = Cli::try_parse_from(["ov", "skills", "list", "--limit", "25"]) diff --git a/docs/en/api/02-resources.md b/docs/en/api/02-resources.md index 0c7457f402..3a9ecc758c 100644 --- a/docs/en/api/02-resources.md +++ b/docs/en/api/02-resources.md @@ -47,7 +47,7 @@ OpenViking supports various resource types, categorized by functionality: | Type | Description | |------|-------------| -| Feishu/Lark | URL-based, supports docx, wiki, sheets, bitable, requires FEISHU_APP_ID and FEISHU_APP_SECRET configuration | +| Feishu/Lark | URL-based, supports docx, wiki, sheets, bitable. By default uses app credentials from FEISHU_APP_ID and FEISHU_APP_SECRET; one-time user-token imports can pass `args.feishu_access_token` | ### Resource Processing Pipeline @@ -156,6 +156,7 @@ This endpoint is the core entry point for resource management, supporting adding | exclude | string | No | None | File patterns to exclude (glob) | | directly_upload_media | bool | No | True | Whether to directly upload media files | | preserve_structure | bool | No | None | Whether to preserve directory structure | +| args | object | No | `{}` | Parser-specific import options forwarded to the source parser/accessor. Core `add_resource` fields such as `path`, `to`, `watch_interval`, `include`, and `exclude` are not allowed inside `args` | | watch_interval | float | No | 0 | Scheduled update interval (minutes). >0 creates task; <=0 cancels task; explicit `to` wins, otherwise binds to the imported `root_uri` | | telemetry | TelemetryRequest | No | False | Whether to return telemetry data | @@ -167,6 +168,8 @@ This endpoint is the core entry point for resource management, supporting adding - Only Git repository sources use full background import when `wait=false`; OpenViking performs repository preflight and target planning before returning the `task_id`. - Other sources with `wait=false` finish source parsing, target resolution, and AGFS writes before returning. Only semantic and embedding queues continue asynchronously. - When `watch_interval > 0`, the watch task binds to `to` if provided; otherwise it binds to the `root_uri` returned by this import. If no stable `root_uri` is available, the request fails and asks for an explicit `to`. +- Feishu/Lark user-token imports pass `args={"feishu_access_token": "u-..."}`. This mode is one-time only: `watch_interval > 0` is rejected because OpenViking does not store or refresh the user token. +- Feishu/Lark app credentials are still required to construct the Lark client. Without `args.feishu_access_token`, OpenViking keeps the existing app credential flow and the SDK obtains a tenant access token from `app_id` and `app_secret`. - For local directory inputs, scanning respects `.gitignore` files (root and nested) with standard Git semantics; `ignore_dirs`, `include`, and `exclude` further refine what is ingested. - To create or update plain text directly, use [content/write](03-filesystem.md#write) instead of `add_resource`. Semantic processing and embeddings are refreshed automatically after resource ingestion and content writes. @@ -206,6 +209,17 @@ curl -X POST http://localhost:1933/api/v1/resources \ \"to\": \"viking://resources/guide.md\", \"reason\": \"User guide\" }" + +# Add a Feishu document with a one-time user access token +curl -X POST http://localhost:1933/api/v1/resources \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "path": "https://example.feishu.cn/docx/doc_token", + "args": { + "feishu_access_token": "u-..." + } + }' ``` **Python SDK** @@ -244,6 +258,12 @@ client.add_resource( to="viking://resources/guide.md", watch_interval=60 # Update every 60 minutes ) + +# Add a Feishu document with a one-time user access token +client.add_resource( + "https://example.feishu.cn/docx/doc_token", + args={"feishu_access_token": "u-..."}, +) ``` **CLI** @@ -267,6 +287,9 @@ ov add-resource https://github.com/example/repo.git --watch-interval 60 # Cancel scheduled updates ov add-resource https://github.com/example/repo.git --to viking://resources/guide.md --watch-interval 0 +# Add a Feishu document with a one-time user access token +ov add-resource https://example.feishu.cn/docx/doc_token --args feishu_access_token:u-... + # Add with parent directory (parent must exist) ov add-resource ./documents/guide.md --parent viking://resources/docs diff --git a/docs/en/guides/06-mcp-integration.md b/docs/en/guides/06-mcp-integration.md index f20cd4855d..eec56ac21d 100644 --- a/docs/en/guides/06-mcp-integration.md +++ b/docs/en/guides/06-mcp-integration.md @@ -117,7 +117,7 @@ Once connected, OpenViking exposes 14 tools: | `read` | Read one or more `viking://` URIs | `uris` (single string or array) | | `list` | List entries under a `viking://` directory | `uri`, `recursive` (optional) | | `store` | Store messages into long-term memory (triggers extraction) | `messages` (list of `{role, content}`) | -| `add_resource` | Add a local file or URL as a resource (local files trigger a progressive upload flow) | `path`, `temp_file_id` (optional), `description` (optional), `watch_interval` (optional, minutes — auto-refresh cadence for remote URLs), `to` (optional, target `viking://resources/...` URI; if omitted when `watch_interval > 0`, the watch auto-binds to the resource's created URI) | +| `add_resource` | Add a local file or URL as a resource (local files trigger a progressive upload flow) | `path`, `temp_file_id` (optional), `description` (optional), `watch_interval` (optional, minutes — auto-refresh cadence for remote URLs), `to` (optional, target `viking://resources/...` URI; if omitted when `watch_interval > 0`, the watch auto-binds to the resource's created URI), `args` (optional parser-specific options, such as `{"feishu_access_token":"u-..."}` for one-time Feishu user-token imports) | | `list_watches` | List watch tasks (auto-refresh subscriptions) visible to the current agent. Each entry shows target URI, refresh interval (minutes), active/paused status, and next scheduled execution time | none | | `cancel_watch` | Cancel (delete) a watch task by its target URI. To change the cadence or pause temporarily, cancel and re-add with a new `watch_interval` | `to_uri` (must match the watch task's `to` value, e.g. `viking://resources/...`) | | `grep` | Regex content search across `viking://` files | `uri`, `pattern` (string), `case_insensitive` | @@ -130,6 +130,8 @@ Once connected, OpenViking exposes 14 tools: > **Note**: MCP exposes the minimum closure for watch management (`list_watches` + `cancel_watch`). Pause / resume / trigger and the unified `update` verb are intentionally not exposed here — use the REST `/api/v1/watches/*` endpoints or the `ov task watch` CLI for those operations. +> Feishu/Lark imports with `args.feishu_access_token` are one-time only. They cannot be combined with `watch_interval > 0` because OpenViking does not store or refresh user tokens. + ### Adding local-file resources (progressive upload) The `add_resource` tool accepts both **remote URLs** and **local file paths**, handled differently: diff --git a/docs/zh/api/02-resources.md b/docs/zh/api/02-resources.md index 87ded1b511..ce295ce850 100644 --- a/docs/zh/api/02-resources.md +++ b/docs/zh/api/02-resources.md @@ -42,7 +42,7 @@ OpenViking 支持多种资源类型,按照功能分类如下: 云文档类 | 类型 | 说明 | |------|------| -| 飞书/Lark | URL 方式,支持 docx, wiki, sheets, bitable,需要配置 FEISHU_APP_ID 和 FEISHU_APP_SECRET | +| 飞书/Lark | URL 方式,支持 docx, wiki, sheets, bitable。默认使用 FEISHU_APP_ID 和 FEISHU_APP_SECRET 应用凭证;一次性用户 token 导入可传 `args.feishu_access_token` | ### 资源处理流程 @@ -151,6 +151,7 @@ URL/文件 Parser TreeBuilder AGFS Summarizer/Vector | exclude | string | 否 | None | 排除的文件模式(glob) | | directly_upload_media | bool | 否 | True | 是否直接上传媒体文件 | | preserve_structure | bool | 否 | None | 是否保留目录结构 | +| args | object | 否 | `{}` | 传给特定 parser/accessor 的导入参数。`path`、`to`、`watch_interval`、`include`、`exclude` 等 `add_resource` 核心字段不能放入 `args` | | watch_interval | float | 否 | 0 | 定时更新间隔(分钟)。>0 创建任务;≤0 取消任务;显式 `to` 优先,否则绑定本次导入的 `root_uri` | | telemetry | TelemetryRequest | 否 | False | 是否返回遥测数据 | @@ -160,6 +161,8 @@ URL/文件 Parser TreeBuilder AGFS Summarizer/Vector - 只有 Git 仓库来源在 `wait=false` 时使用完整后台导入;OpenViking 会先完成仓库 preflight 和目标规划,再返回 `task_id`。 - 其他来源在 `wait=false` 时会在响应前完成来源解析、目标解析和 AGFS 写入,仅 semantic 与 embedding 队列继续异步处理。 - `watch_interval > 0` 时,如果指定了 `to`,监控任务绑定该目标;如果未指定 `to`,监控任务绑定本次导入返回的 `root_uri`。如果无法得到稳定 `root_uri`,请求会报错并要求显式传 `to`。 +- 飞书/Lark 用户 token 导入通过 `args={"feishu_access_token": "u-..."}` 传入。该模式只支持一次性导入:`watch_interval > 0` 会被拒绝,因为 OpenViking 不保存也不刷新用户 token。 +- 飞书/Lark 仍需要应用凭证来构造 Lark client。未传 `args.feishu_access_token` 时,OpenViking 保持原有应用凭证流程,由 SDK 使用 `app_id` 和 `app_secret` 自动获取 tenant access token。 - 本地目录输入会遵循 `.gitignore`(根目录和子目录,标准 Git 语义);`ignore_dirs`、`include`、`exclude` 会在此基础上进一步过滤。 - 如果要直接创建或更新纯文本内容,请使用 [content/write](03-filesystem.md#write),不要使用 `add_resource`。资源导入和内容写入后都会自动刷新语义与 embedding。 @@ -199,6 +202,17 @@ curl -X POST http://localhost:1933/api/v1/resources \ \"to\": \"viking://resources/guide.md\", \"reason\": \"User guide\" }" + +# 使用一次性用户 access token 添加飞书文档 +curl -X POST http://localhost:1933/api/v1/resources \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "path": "https://example.feishu.cn/docx/doc_token", + "args": { + "feishu_access_token": "u-..." + } + }' ``` **Python SDK** @@ -237,6 +251,12 @@ client.add_resource( to="viking://resources/guide.md", watch_interval=60 # 每60分钟更新一次 ) + +# 使用一次性用户 access token 添加飞书文档 +client.add_resource( + "https://example.feishu.cn/docx/doc_token", + args={"feishu_access_token": "u-..."}, +) ``` **CLI** @@ -260,6 +280,9 @@ ov add-resource https://github.com/example/repo.git --watch-interval 60 # 取消定时更新 ov add-resource https://github.com/example/repo.git --to viking://resources/my_repo --watch-interval 0 +# 使用一次性用户 access token 添加飞书文档 +ov add-resource https://example.feishu.cn/docx/doc_token --args feishu_access_token:u-... + # 添加到指定父目录(父目录必须存在) ov add-resource ./documents/guide.md --parent viking://resources/docs diff --git a/docs/zh/guides/06-mcp-integration.md b/docs/zh/guides/06-mcp-integration.md index 2b56d7f1a3..f05a2dcc6a 100644 --- a/docs/zh/guides/06-mcp-integration.md +++ b/docs/zh/guides/06-mcp-integration.md @@ -109,7 +109,7 @@ claude mcp add --transport http openviking \ | `read` | 读取一个或多个 `viking://` URI 的内容 | `uris`(单个字符串或数组) | | `list` | 列出 `viking://` 目录下的条目 | `uri`, `recursive`(可选) | | `store` | 存储消息到长期记忆(触发记忆提取) | `messages`(`{role, content}` 列表) | -| `add_resource` | 添加本地文件或 URL 作为资源(本地文件触发渐进式上传流) | `path`, `temp_file_id`(可选), `description`(可选), `watch_interval`(可选,分钟数 — 远程 URL 的自动刷新周期), `to`(可选,目标 `viking://resources/...` URI;`watch_interval > 0` 时若省略 `to`,watch 将自动绑定到本次 add 创建的资源 URI) | +| `add_resource` | 添加本地文件或 URL 作为资源(本地文件触发渐进式上传流) | `path`, `temp_file_id`(可选), `description`(可选), `watch_interval`(可选,分钟数 — 远程 URL 的自动刷新周期), `to`(可选,目标 `viking://resources/...` URI;`watch_interval > 0` 时若省略 `to`,watch 将自动绑定到本次 add 创建的资源 URI), `args`(可选,特定 parser 参数,例如飞书一次性用户 token 导入使用 `{"feishu_access_token":"u-..."}`) | | `list_watches` | 列出当前 Agent 可见的 watch 任务(自动刷新订阅),每行显示目标 URI、刷新间隔(分钟)、active/paused 状态以及下一次调度时间 | 无 | | `cancel_watch` | 按目标 URI 取消(删除)watch 任务。若需调整刷新周期或临时暂停,请取消后使用新的 `watch_interval` 重新添加 | `to_uri`(必须匹配 watch 任务的 `to` 值,例如 `viking://resources/...`) | | `grep` | 在 `viking://` 文件中进行正则内容搜索 | `uri`, `pattern`(字符串), `case_insensitive` | @@ -122,6 +122,8 @@ claude mcp add --transport http openviking \ > **注**:MCP 仅暴露 watch 管理的最小闭包(`list_watches` + `cancel_watch`)。pause / resume / trigger 和统一的 `update` 动作刻意不在此处暴露,请通过 REST `/api/v1/watches/*` 接口或 `ov task watch` CLI 使用上述操作。 +> 使用 `args.feishu_access_token` 的飞书/Lark 导入只支持一次性导入,不能和 `watch_interval > 0` 组合,因为 OpenViking 不保存也不刷新用户 token。 + ### 添加本地文件资源(渐进式上传) `add_resource` 工具同时接受**远程 URL** 和**本地文件路径**。两者的处理路径不同: diff --git a/openviking/async_client.py b/openviking/async_client.py index 3312475832..c99bcbf6e1 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -267,6 +267,7 @@ async def add_resource( build_index: bool = True, summarize: bool = False, watch_interval: float = 0, + args: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, **kwargs, ) -> Dict[str, Any]: @@ -301,6 +302,7 @@ async def add_resource( summarize=summarize, telemetry=telemetry, watch_interval=watch_interval, + args=args, **kwargs, ) diff --git a/openviking/client/local.py b/openviking/client/local.py index ca56a75162..ebc34f36f4 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -102,6 +102,7 @@ async def add_resource( summarize: bool = False, telemetry: TelemetryRequest = False, watch_interval: float = 0, + args: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -123,6 +124,7 @@ async def add_resource( build_index=build_index, summarize=summarize, watch_interval=watch_interval, + args=args, **kwargs, ), ) diff --git a/openviking/parse/accessors/feishu_accessor.py b/openviking/parse/accessors/feishu_accessor.py index 1b8cf272bb..90401b0b85 100644 --- a/openviking/parse/accessors/feishu_accessor.py +++ b/openviking/parse/accessors/feishu_accessor.py @@ -142,6 +142,7 @@ class FeishuAccessor(DataAccessor): def __init__(self): """Initialize Feishu accessor.""" self._client = None + self._user_token_client = None self._config = None @property @@ -174,10 +175,14 @@ async def access(self, source: Union[str, Path], **kwargs) -> LocalResource: LocalResource pointing to the temporary Markdown file """ source_str = str(source) + feishu_access_token = kwargs.get("feishu_access_token") try: # Fetch the document and convert to Markdown - doc = await self._fetch_document(source_str) + doc = await self._fetch_document( + source_str, + feishu_access_token=feishu_access_token, + ) # Create temporary file temp_file = tempfile.NamedTemporaryFile( @@ -209,7 +214,12 @@ async def access(self, source: Union[str, Path], **kwargs) -> LocalResource: logger.error(f"[FeishuAccessor] Failed to access {source}: {e}", exc_info=True) raise - async def _fetch_document(self, url: str) -> FeishuDocument: + async def _fetch_document( + self, + url: str, + *, + feishu_access_token: Optional[str] = None, + ) -> FeishuDocument: """ Fetch a Feishu document and convert to Markdown. @@ -223,7 +233,11 @@ async def _fetch_document(self, url: str) -> FeishuDocument: if doc_type == "wiki": # Resolve wiki node to actual document type - real_type, real_token, title = await asyncio.to_thread(self._resolve_wiki_node, token) + real_type, real_token, title = await asyncio.to_thread( + self._resolve_wiki_node, + token, + feishu_access_token, + ) doc_type, token = real_type, real_token meta["wiki_resolved"] = True @@ -235,7 +249,11 @@ async def _fetch_document(self, url: str) -> FeishuDocument: ) # Call the handler (in thread pool since lark-oapi is sync) - markdown, doc_title = await asyncio.to_thread(self._parse_docx, token) + markdown, doc_title = await asyncio.to_thread( + self._parse_docx, + token, + feishu_access_token, + ) if title: doc_title = title @@ -292,9 +310,11 @@ def _get_config(self): self._config = get_openviking_config().feishu return self._config - def _get_client(self): + def _get_client(self, *, use_user_token: bool = False): """Lazy-init lark-oapi client.""" - if self._client is None: + cache_attr = "_user_token_client" if use_user_token else "_client" + client = getattr(self, cache_attr) + if client is None: try: import lark_oapi as lark except ImportError: @@ -311,14 +331,28 @@ def _get_client(self): "FEISHU_APP_SECRET environment variables, or configure in ov.conf." ) domain = config.domain or "https://open.feishu.cn" - self._client = ( - lark.Client.builder().app_id(app_id).app_secret(app_secret).domain(domain).build() - ) - return self._client + builder = lark.Client.builder().app_id(app_id).app_secret(app_secret).domain(domain) + if use_user_token: + builder = builder.enable_set_token(True) + client = builder.build() + setattr(self, cache_attr, client) + return client + + @staticmethod + def _user_request_option(feishu_access_token: Optional[str]): + if not feishu_access_token: + return None + from lark_oapi.core.model import RequestOption + + return RequestOption.builder().user_access_token(feishu_access_token).build() # ========== Wiki Resolution ========== - def _resolve_wiki_node(self, token: str) -> Tuple[str, str, Optional[str]]: + def _resolve_wiki_node( + self, + token: str, + feishu_access_token: Optional[str] = None, + ) -> Tuple[str, str, Optional[str]]: """ Resolve wiki token to actual document type, token, and title. @@ -327,9 +361,13 @@ def _resolve_wiki_node(self, token: str) -> Tuple[str, str, Optional[str]]: """ from lark_oapi.api.wiki.v2 import GetNodeSpaceRequest - client = self._get_client() + client = self._get_client(use_user_token=bool(feishu_access_token)) request = GetNodeSpaceRequest.builder().token(token).build() - response = client.wiki.v2.space.get_node(request) + option = self._user_request_option(feishu_access_token) + if option is None: + response = client.wiki.v2.space.get_node(request) + else: + response = client.wiki.v2.space.get_node(request, option) if not response.success(): raise RuntimeError( f"Failed to resolve wiki node {token}: code={response.code}, msg={response.msg}" @@ -346,14 +384,21 @@ def _resolve_wiki_node(self, token: str) -> Tuple[str, str, Optional[str]]: # ========== Docx Parsing ========== - def _parse_docx(self, document_id: str) -> Tuple[str, str]: + def _parse_docx( + self, + document_id: str, + feishu_access_token: Optional[str] = None, + ) -> Tuple[str, str]: """ Fetch all blocks and convert to Markdown. Returns: (markdown_content, document_title) """ - blocks = self._fetch_all_blocks(document_id) + blocks = self._fetch_all_blocks( + document_id, + feishu_access_token=feishu_access_token, + ) if not blocks: return "", "Untitled" @@ -389,11 +434,16 @@ def _parse_docx(self, document_id: str) -> Tuple[str, str]: return markdown, doc_title - def _fetch_all_blocks(self, document_id: str) -> list: + def _fetch_all_blocks( + self, + document_id: str, + *, + feishu_access_token: Optional[str] = None, + ) -> list: """Fetch all blocks with pagination. Returns list of SDK block objects.""" from lark_oapi.api.docx.v1 import ListDocumentBlockRequest - client = self._get_client() + client = self._get_client(use_user_token=bool(feishu_access_token)) all_blocks = [] page_token = None @@ -408,7 +458,11 @@ def _fetch_all_blocks(self, document_id: str) -> list: builder = builder.page_token(page_token) request = builder.build() - response = client.docx.v1.document_block.list(request) + option = self._user_request_option(feishu_access_token) + if option is None: + response = client.docx.v1.document_block.list(request) + else: + response = client.docx.v1.document_block.list(request, option) if not response.success(): raise RuntimeError( diff --git a/openviking/server/mcp_endpoint.py b/openviking/server/mcp_endpoint.py index d222606a98..036f83df27 100644 --- a/openviking/server/mcp_endpoint.py +++ b/openviking/server/mcp_endpoint.py @@ -21,7 +21,7 @@ import os from contextlib import asynccontextmanager from datetime import datetime, timezone -from typing import List, Literal, Optional +from typing import Any, List, Literal, Optional from urllib.parse import quote from mcp.server.fastmcp import FastMCP @@ -439,6 +439,7 @@ async def add_resource( description: str = "", watch_interval: float = 0, to: str = "", + args: Optional[dict[str, Any]] = None, ) -> str: """Add a resource to OpenViking. Asynchronous — processing happens in the background. @@ -472,6 +473,8 @@ async def add_resource( to: Target URI under viking://resources/ (e.g. "viking://resources/volcengine/OpenViking"). Leave empty to let the system derive a URI from the source. + args: Parser-specific import options. For Feishu user-token imports, pass + {"feishu_access_token": "..."}. """ from openviking.server.local_input_guard import require_remote_resource_source @@ -504,6 +507,7 @@ async def add_resource( wait=False, allow_local_path_resolution=True, enforce_public_remote_targets=True, + args=args, ) except Exception as exc: await store.mark_failed(resolved, ctx) @@ -540,6 +544,7 @@ async def add_resource( wait=False, watch_interval=watch_interval, enforce_public_remote_targets=True, + args=args, ) except Exception as exc: return f"Error adding resource: {exc}" diff --git a/openviking/server/routers/resources.py b/openviking/server/routers/resources.py index 044976cfb9..725f2586f0 100644 --- a/openviking/server/routers/resources.py +++ b/openviking/server/routers/resources.py @@ -5,7 +5,7 @@ from typing import Any, Dict, Optional from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile -from pydantic import BaseModel, ConfigDict, model_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator from openviking.core.path_variables import resolve_path_variables from openviking.server.auth import get_request_context @@ -50,6 +50,8 @@ class AddResourceRequest(BaseModel): exclude: Glob pattern for files to exclude during parsing. directly_upload_media: Whether to directly upload media files. Default is True. preserve_structure: Whether to preserve directory structure when adding directories. + args: Parser-specific import options. For Feishu user-token imports, pass + {"feishu_access_token": "..."}. watch_interval: Watch interval in minutes for automatic resource monitoring. - watch_interval > 0: Creates or updates a watch task. The resource will be automatically re-processed at the specified interval. @@ -81,6 +83,7 @@ class AddResourceRequest(BaseModel): exclude: Optional[str] = None directly_upload_media: bool = True preserve_structure: Optional[bool] = None + args: Dict[str, Any] = Field(default_factory=dict) telemetry: TelemetryRequest = False watch_interval: float = 0 @@ -247,6 +250,7 @@ async def _add() -> dict[str, Any]: timeout=request.timeout, allow_local_path_resolution=allow_local_path_resolution, enforce_public_remote_targets=True, + args=request.args, **kwargs, ) except Exception: diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index e55b68f6cf..6be559aab1 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -56,6 +56,39 @@ logger = get_logger(__name__) +_ADD_RESOURCE_ARGS_RESERVED_FIELDS = frozenset( + { + "path", + "ctx", + "to", + "parent", + "reason", + "instruction", + "wait", + "timeout", + "build_index", + "summarize", + "watch_interval", + "skip_watch_management", + "allow_local_path_resolution", + "enforce_public_remote_targets", + "resource_lock", + "stage_callback", + "args", + "strict", + "source_name", + "ignore_dirs", + "include", + "exclude", + "directly_upload_media", + "preserve_structure", + "create_parent", + "telemetry", + "request_validator", + } +) + + @dataclass class _ResourceSourceInfo: source_name: Optional[str] = None @@ -111,6 +144,39 @@ def _sanitize_watch_processor_kwargs(self, processor_kwargs: Dict[str, Any]) -> sanitized[key] = value return sanitized + def _normalize_add_resource_args( + self, + args: Optional[Dict[str, Any]], + *, + watch_interval: float, + ) -> Dict[str, Any]: + if args is None: + return {} + if not isinstance(args, dict): + raise InvalidArgumentError("args must be an object.") + if not args: + return {} + + reserved = sorted(set(args).intersection(_ADD_RESOURCE_ARGS_RESERVED_FIELDS)) + if reserved: + raise InvalidArgumentError( + "args cannot contain core add_resource fields: " + ", ".join(reserved) + ) + + normalized = dict(args) + token = normalized.get("feishu_access_token") + if token is not None: + if not isinstance(token, str) or not token.strip(): + raise InvalidArgumentError("args.feishu_access_token must be a non-empty string.") + if watch_interval > 0: + raise InvalidArgumentError( + "args.feishu_access_token only supports one-time import; " + "watch_interval must be 0." + ) + normalized["feishu_access_token"] = token.strip() + + return normalized + def _ensure_initialized(self) -> None: """Ensure all dependencies are initialized.""" if not self._resource_processor: @@ -145,10 +211,12 @@ async def enqueue_git_add_resource( skip_watch_management: bool = False, allow_local_path_resolution: bool = True, enforce_public_remote_targets: bool = False, + args: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """Start background ingestion for Git repositories while reserving the target URI.""" self._ensure_initialized() + kwargs.update(self._normalize_add_resource_args(args, watch_interval=watch_interval)) if to: to = resolve_path_variables(to) @@ -422,6 +490,7 @@ async def add_resource( enforce_public_remote_targets: bool = False, resource_lock: Optional[LockLease] = None, stage_callback: Optional[Callable[[str], Any]] = None, + args: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """Add resource to OpenViking (only supports resources scope). @@ -451,6 +520,7 @@ async def add_resource( avoid recursive watch task creation during scheduled execution) enforce_public_remote_targets: When True, reject non-public remote hosts and validate each outbound HTTP request URL during fetch. + args: Parser-specific options forwarded to the parser chain. **kwargs: Extra options forwarded to the parser chain Returns: @@ -461,6 +531,7 @@ async def add_resource( InvalidArgumentError: If the URI scope is not 'resources' """ self._ensure_initialized() + kwargs.update(self._normalize_add_resource_args(args, watch_interval=watch_interval)) if not wait and is_git_repo_url(path): return await self.enqueue_git_add_resource( path=path, diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 6ecc91726a..853ae85892 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -176,6 +176,7 @@ def add_resource( timeout: float = None, build_index: bool = True, summarize: bool = False, + args: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, **kwargs, ) -> Dict[str, Any]: @@ -202,6 +203,7 @@ def add_resource( timeout=timeout, build_index=build_index, summarize=summarize, + args=args, telemetry=telemetry, **kwargs, ) diff --git a/openviking_cli/client/base.py b/openviking_cli/client/base.py index 066f12b6d1..f9c866c8bd 100644 --- a/openviking_cli/client/base.py +++ b/openviking_cli/client/base.py @@ -42,6 +42,7 @@ async def add_resource( wait: bool = False, timeout: Optional[float] = None, watch_interval: float = 0, + args: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" diff --git a/openviking_cli/client/http.py b/openviking_cli/client/http.py index cb84fd31e0..9cc0efb314 100644 --- a/openviking_cli/client/http.py +++ b/openviking_cli/client/http.py @@ -393,6 +393,7 @@ async def add_resource( directly_upload_media: bool = True, preserve_structure: Optional[bool] = None, watch_interval: float = 0, + args: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -413,6 +414,7 @@ async def add_resource( "exclude": exclude, "directly_upload_media": directly_upload_media, "watch_interval": watch_interval, + "args": args or {}, "telemetry": telemetry, } if preserve_structure is not None: diff --git a/openviking_cli/client/sync_http.py b/openviking_cli/client/sync_http.py index a624896f83..df62e93a37 100644 --- a/openviking_cli/client/sync_http.py +++ b/openviking_cli/client/sync_http.py @@ -233,6 +233,7 @@ def add_resource( exclude: Optional[str] = None, directly_upload_media: bool = True, watch_interval: float = 0, + args: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -253,6 +254,7 @@ def add_resource( exclude=exclude, directly_upload_media=directly_upload_media, watch_interval=watch_interval, + args=args, telemetry=telemetry, ) ) diff --git a/tests/client/test_http_client_local_upload.py b/tests/client/test_http_client_local_upload.py index 5bed45efa9..698e25aa31 100644 --- a/tests/client/test_http_client_local_upload.py +++ b/tests/client/test_http_client_local_upload.py @@ -99,6 +99,26 @@ async def fake_upload(_path: str) -> str: assert "path" not in call["json"] +@pytest.mark.asyncio +async def test_add_resource_forwards_args_for_remote_url(): + client = AsyncHTTPClient(url="http://127.0.0.1:1933") + fake_http = _FakeHTTPClient() + client._http = fake_http + client._handle_response_data = lambda _response: { + "result": {"root_uri": "viking://resources/demo"} + } + + await client.add_resource( + "https://example.feishu.cn/docx/doc_token", + args={"feishu_access_token": "u-test"}, + ) + + call = fake_http.calls[-1] + assert call["path"] == "/api/v1/resources" + assert call["json"]["path"] == "https://example.feishu.cn/docx/doc_token" + assert call["json"]["args"] == {"feishu_access_token": "u-test"} + + @pytest.mark.asyncio async def test_import_ovpack_uploads_local_file_even_when_url_is_localhost(tmp_path): pack_file = tmp_path / "demo.ovpack" diff --git a/tests/parse/test_feishu_accessor.py b/tests/parse/test_feishu_accessor.py new file mode 100644 index 0000000000..4c4c66cf0a --- /dev/null +++ b/tests/parse/test_feishu_accessor.py @@ -0,0 +1,174 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Tests for FeishuAccessor user token handling.""" + +import sys +from types import ModuleType, SimpleNamespace +from unittest.mock import MagicMock + +from openviking.parse.accessors.feishu_accessor import FeishuAccessor + + +class _SuccessResponse: + def __init__(self, data): + self.data = data + self.code = 0 + self.msg = "" + + @staticmethod + def success(): + return True + + +class _FakeRequestOption: + def __init__(self): + self.user_access_token = None + + @staticmethod + def builder(): + return _FakeRequestOptionBuilder() + + +class _FakeRequestOptionBuilder: + def __init__(self): + self._option = _FakeRequestOption() + + def user_access_token(self, token): + self._option.user_access_token = token + return self + + def build(self): + return self._option + + +class _FakeListDocumentBlockRequest: + @staticmethod + def builder(): + return _FakeListDocumentBlockRequestBuilder() + + +class _FakeListDocumentBlockRequestBuilder: + def __init__(self): + self._request = SimpleNamespace(document_id=None, page_token=None) + + def document_id(self, document_id): + self._request.document_id = document_id + return self + + def page_size(self, _page_size): + return self + + def document_revision_id(self, _revision_id): + return self + + def page_token(self, page_token): + self._request.page_token = page_token + return self + + def build(self): + return self._request + + +class _FakeGetNodeSpaceRequest: + @staticmethod + def builder(): + return _FakeGetNodeSpaceRequestBuilder() + + +class _FakeGetNodeSpaceRequestBuilder: + def __init__(self): + self._request = SimpleNamespace(token=None) + + def token(self, token): + self._request.token = token + return self + + def build(self): + return self._request + + +def _package(name): + module = ModuleType(name) + module.__path__ = [] + return module + + +def _install_fake_lark_modules(monkeypatch): + modules = { + "lark_oapi": _package("lark_oapi"), + "lark_oapi.api": _package("lark_oapi.api"), + "lark_oapi.api.docx": _package("lark_oapi.api.docx"), + "lark_oapi.api.wiki": _package("lark_oapi.api.wiki"), + "lark_oapi.core": _package("lark_oapi.core"), + } + docx_v1 = ModuleType("lark_oapi.api.docx.v1") + docx_v1.ListDocumentBlockRequest = _FakeListDocumentBlockRequest + wiki_v2 = ModuleType("lark_oapi.api.wiki.v2") + wiki_v2.GetNodeSpaceRequest = _FakeGetNodeSpaceRequest + core_model = ModuleType("lark_oapi.core.model") + core_model.RequestOption = _FakeRequestOption + modules.update( + { + "lark_oapi.api.docx.v1": docx_v1, + "lark_oapi.api.wiki.v2": wiki_v2, + "lark_oapi.core.model": core_model, + } + ) + for name, module in modules.items(): + monkeypatch.setitem(sys.modules, name, module) + + +def test_fetch_all_blocks_uses_user_access_token_option(monkeypatch): + _install_fake_lark_modules(monkeypatch) + list_blocks = MagicMock( + return_value=_SuccessResponse( + SimpleNamespace(items=[], has_more=False, page_token=None), + ) + ) + accessor = FeishuAccessor() + accessor._user_token_client = SimpleNamespace( + docx=SimpleNamespace(v1=SimpleNamespace(document_block=SimpleNamespace(list=list_blocks))) + ) + + blocks = accessor._fetch_all_blocks("doc_token", feishu_access_token="u-test") + + assert blocks == [] + request, option = list_blocks.call_args.args + assert request.document_id == "doc_token" + assert option.user_access_token == "u-test" + + +def test_fetch_all_blocks_keeps_default_app_token_call_shape(monkeypatch): + _install_fake_lark_modules(monkeypatch) + list_blocks = MagicMock( + return_value=_SuccessResponse( + SimpleNamespace(items=[], has_more=False, page_token=None), + ) + ) + accessor = FeishuAccessor() + accessor._client = SimpleNamespace( + docx=SimpleNamespace(v1=SimpleNamespace(document_block=SimpleNamespace(list=list_blocks))) + ) + + blocks = accessor._fetch_all_blocks("doc_token") + + assert blocks == [] + assert len(list_blocks.call_args.args) == 1 + assert list_blocks.call_args.args[0].document_id == "doc_token" + + +def test_resolve_wiki_node_uses_user_access_token_option(monkeypatch): + _install_fake_lark_modules(monkeypatch) + node = SimpleNamespace(obj_type="doc", obj_token="doc_token", title="Title") + get_node = MagicMock(return_value=_SuccessResponse(SimpleNamespace(node=node))) + accessor = FeishuAccessor() + accessor._user_token_client = SimpleNamespace( + wiki=SimpleNamespace(v2=SimpleNamespace(space=SimpleNamespace(get_node=get_node))) + ) + + doc_type, token, title = accessor._resolve_wiki_node("wiki_token", "u-test") + + assert (doc_type, token, title) == ("docx", "doc_token", "Title") + request, option = get_node.call_args.args + assert request.token == "wiki_token" + assert option.user_access_token == "u-test" diff --git a/tests/server/test_api_resources.py b/tests/server/test_api_resources.py index c3c2ce516e..2991abca61 100644 --- a/tests/server/test_api_resources.py +++ b/tests/server/test_api_resources.py @@ -69,6 +69,34 @@ async def test_add_resource_with_wait( assert "root_uri" in body["result"] +async def test_add_resource_forwards_args_to_service( + client: httpx.AsyncClient, + service, + monkeypatch, +): + seen = {} + + async def fake_add_resource(**kwargs): + seen.update(kwargs) + return { + "status": "success", + "root_uri": "viking://resources/demo", + } + + monkeypatch.setattr(service.resources, "add_resource", fake_add_resource) + + resp = await client.post( + "/api/v1/resources", + json={ + "path": "https://example.com/demo.md", + "args": {"feishu_access_token": "u-test"}, + }, + ) + + assert resp.status_code == 200 + assert seen["args"] == {"feishu_access_token": "u-test"} + + async def test_add_resource_with_telemetry_wait( client: httpx.AsyncClient, sample_markdown_file, diff --git a/tests/service/test_resource_service_watch.py b/tests/service/test_resource_service_watch.py index d1dd210a3e..390eef8e6e 100644 --- a/tests/service/test_resource_service_watch.py +++ b/tests/service/test_resource_service_watch.py @@ -11,7 +11,7 @@ from openviking.resource.watch_manager import WatchManager from openviking.server.identity import RequestContext, Role from openviking.service.resource_service import ResourceService -from openviking_cli.exceptions import ConflictError +from openviking_cli.exceptions import ConflictError, InvalidArgumentError from openviking_cli.session.user_id import UserIdentifier @@ -27,7 +27,11 @@ async def get_task_by_uri(service: ResourceService, to_uri: str, ctx: RequestCon class MockResourceProcessor: """Mock ResourceProcessor for testing.""" + def __init__(self): + self.calls = [] + async def process_resource(self, **kwargs): + self.calls.append(kwargs) return {"root_uri": kwargs.get("to") or "viking://resources/test"} @@ -209,6 +213,68 @@ async def test_no_watch_task_created_with_negative_interval( assert task is None +class TestAddResourceArgs: + """Tests for parser-specific add_resource args.""" + + @pytest.mark.asyncio + async def test_forwards_args_to_resource_processor( + self, resource_service: ResourceService, request_context: RequestContext + ): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + args={"feishu_access_token": " u-test "}, + ) + + processor = resource_service._resource_processor + assert processor.calls[-1]["feishu_access_token"] == "u-test" + + @pytest.mark.asyncio + async def test_rejects_feishu_access_token_with_watch( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="one-time import"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + watch_interval=30, + args={"feishu_access_token": "u-test"}, + ) + + @pytest.mark.asyncio + async def test_rejects_invalid_feishu_access_token( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="non-empty string"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + args={"feishu_access_token": 123}, + ) + + @pytest.mark.asyncio + async def test_rejects_non_object_args( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="args must be an object"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + args=[], + ) + + @pytest.mark.asyncio + async def test_rejects_core_add_resource_fields_in_args( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="core add_resource fields"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + args={"watch_interval": 30}, + ) + + class TestWatchTaskConflict: """Tests for watch task conflict detection.""" From 422da58c2a3e2b866e5f751962fddeb378ed8607 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Thu, 11 Jun 2026 13:59:45 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(resource):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=A3=9E=E4=B9=A6=E7=94=A8=E6=88=B7=20token=20watch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/en/api/02-resources.md | 41 +++- docs/en/guides/06-mcp-integration.md | 4 +- docs/zh/api/02-resources.md | 41 +++- docs/zh/guides/06-mcp-integration.md | 4 +- openviking/resource/feishu_watch_auth.py | 240 +++++++++++++++++++ openviking/resource/watch_manager.py | 37 ++- openviking/resource/watch_scheduler.py | 85 +++++-- openviking/server/mcp_endpoint.py | 5 +- openviking/server/routers/resources.py | 5 +- openviking/service/resource_service.py | 64 ++++- tests/resource/test_feishu_watch_auth.py | 137 +++++++++++ tests/resource/test_watch_manager.py | 61 +++++ tests/server/test_api_watches.py | 25 ++ tests/service/test_resource_service_watch.py | 96 +++++++- tests/service/test_watch_recovery.py | 130 ++++++++++ 15 files changed, 928 insertions(+), 47 deletions(-) create mode 100644 openviking/resource/feishu_watch_auth.py create mode 100644 tests/resource/test_feishu_watch_auth.py diff --git a/docs/en/api/02-resources.md b/docs/en/api/02-resources.md index 3a9ecc758c..f6ac9de6ac 100644 --- a/docs/en/api/02-resources.md +++ b/docs/en/api/02-resources.md @@ -47,7 +47,7 @@ OpenViking supports various resource types, categorized by functionality: | Type | Description | |------|-------------| -| Feishu/Lark | URL-based, supports docx, wiki, sheets, bitable. By default uses app credentials from FEISHU_APP_ID and FEISHU_APP_SECRET; one-time user-token imports can pass `args.feishu_access_token` | +| Feishu/Lark | URL-based, supports docx, wiki, sheets, bitable. By default uses app credentials from FEISHU_APP_ID and FEISHU_APP_SECRET; user-token imports can pass `args.feishu_access_token`, and user-token watches also pass `args.feishu_refresh_token` | ### Resource Processing Pipeline @@ -168,8 +168,11 @@ This endpoint is the core entry point for resource management, supporting adding - Only Git repository sources use full background import when `wait=false`; OpenViking performs repository preflight and target planning before returning the `task_id`. - Other sources with `wait=false` finish source parsing, target resolution, and AGFS writes before returning. Only semantic and embedding queues continue asynchronously. - When `watch_interval > 0`, the watch task binds to `to` if provided; otherwise it binds to the `root_uri` returned by this import. If no stable `root_uri` is available, the request fails and asks for an explicit `to`. -- Feishu/Lark user-token imports pass `args={"feishu_access_token": "u-..."}`. This mode is one-time only: `watch_interval > 0` is rejected because OpenViking does not store or refresh the user token. -- Feishu/Lark app credentials are still required to construct the Lark client. Without `args.feishu_access_token`, OpenViking keeps the existing app credential flow and the SDK obtains a tenant access token from `app_id` and `app_secret`. +- Feishu/Lark app-token imports do not pass `args.feishu_access_token`. OpenViking keeps the existing app credential flow and the SDK obtains an app/tenant token from `app_id` and `app_secret`. This mode supports both one-time imports and `watch_interval > 0`. +- Feishu/Lark one-time user-token imports pass `args={"feishu_access_token": "u-..."}` with `watch_interval <= 0`. OpenViking uses that user token only for the current import and does not store it. +- Feishu/Lark user-token watches pass `args={"feishu_access_token": "u-...", "feishu_refresh_token": "r-..."}` with `watch_interval > 0`. OpenViking stores the token state in the private watch task state, refreshes it with the configured Feishu app credentials, and uses the refreshed user token for later watch runs. +- Feishu/Lark user-token watches require `FEISHU_APP_ID` and `FEISHU_APP_SECRET` (or `feishu.app_id` and `feishu.app_secret` in `ov.conf`) because Feishu refresh tokens are bound to the app that issued them. The supplied user token must come from the same Feishu app configured in OpenViking. +- Watch task token state is stored in the internal `viking://resources/.watch_tasks.json` control file and is hidden from watch API/MCP/CLI responses. If VikingFS file encryption is enabled, this control file is encrypted at rest; otherwise the server-side control file contains plaintext token state. - For local directory inputs, scanning respects `.gitignore` files (root and nested) with standard Git semantics; `ignore_dirs`, `include`, and `exclude` further refine what is ingested. - To create or update plain text directly, use [content/write](03-filesystem.md#write) instead of `add_resource`. Semantic processing and embeddings are refreshed automatically after resource ingestion and content writes. @@ -220,6 +223,20 @@ curl -X POST http://localhost:1933/api/v1/resources \ "feishu_access_token": "u-..." } }' + +# Add a Feishu document with scheduled user-token refresh +curl -X POST http://localhost:1933/api/v1/resources \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "path": "https://example.feishu.cn/docx/doc_token", + "to": "viking://resources/feishu/doc", + "watch_interval": 1440, + "args": { + "feishu_access_token": "u-...", + "feishu_refresh_token": "r-..." + } + }' ``` **Python SDK** @@ -264,6 +281,17 @@ client.add_resource( "https://example.feishu.cn/docx/doc_token", args={"feishu_access_token": "u-..."}, ) + +# Add a Feishu document with scheduled user-token refresh +client.add_resource( + "https://example.feishu.cn/docx/doc_token", + to="viking://resources/feishu/doc", + watch_interval=1440, + args={ + "feishu_access_token": "u-...", + "feishu_refresh_token": "r-...", + }, +) ``` **CLI** @@ -290,6 +318,13 @@ ov add-resource https://github.com/example/repo.git --to viking://resources/guid # Add a Feishu document with a one-time user access token ov add-resource https://example.feishu.cn/docx/doc_token --args feishu_access_token:u-... +# Add a Feishu document with scheduled user-token refresh +ov add-resource https://example.feishu.cn/docx/doc_token \ + --to viking://resources/feishu/doc \ + --watch-interval 1440 \ + --args feishu_access_token:u-... \ + --args feishu_refresh_token:r-... + # Add with parent directory (parent must exist) ov add-resource ./documents/guide.md --parent viking://resources/docs diff --git a/docs/en/guides/06-mcp-integration.md b/docs/en/guides/06-mcp-integration.md index eec56ac21d..b792dd55f9 100644 --- a/docs/en/guides/06-mcp-integration.md +++ b/docs/en/guides/06-mcp-integration.md @@ -117,7 +117,7 @@ Once connected, OpenViking exposes 14 tools: | `read` | Read one or more `viking://` URIs | `uris` (single string or array) | | `list` | List entries under a `viking://` directory | `uri`, `recursive` (optional) | | `store` | Store messages into long-term memory (triggers extraction) | `messages` (list of `{role, content}`) | -| `add_resource` | Add a local file or URL as a resource (local files trigger a progressive upload flow) | `path`, `temp_file_id` (optional), `description` (optional), `watch_interval` (optional, minutes — auto-refresh cadence for remote URLs), `to` (optional, target `viking://resources/...` URI; if omitted when `watch_interval > 0`, the watch auto-binds to the resource's created URI), `args` (optional parser-specific options, such as `{"feishu_access_token":"u-..."}` for one-time Feishu user-token imports) | +| `add_resource` | Add a local file or URL as a resource (local files trigger a progressive upload flow) | `path`, `temp_file_id` (optional), `description` (optional), `watch_interval` (optional, minutes — auto-refresh cadence for remote URLs), `to` (optional, target `viking://resources/...` URI; if omitted when `watch_interval > 0`, the watch auto-binds to the resource's created URI), `args` (optional parser-specific options, such as `{"feishu_access_token":"u-..."}` for one-time Feishu user-token imports, or `{"feishu_access_token":"u-...","feishu_refresh_token":"r-..."}` for Feishu user-token watches) | | `list_watches` | List watch tasks (auto-refresh subscriptions) visible to the current agent. Each entry shows target URI, refresh interval (minutes), active/paused status, and next scheduled execution time | none | | `cancel_watch` | Cancel (delete) a watch task by its target URI. To change the cadence or pause temporarily, cancel and re-add with a new `watch_interval` | `to_uri` (must match the watch task's `to` value, e.g. `viking://resources/...`) | | `grep` | Regex content search across `viking://` files | `uri`, `pattern` (string), `case_insensitive` | @@ -130,7 +130,7 @@ Once connected, OpenViking exposes 14 tools: > **Note**: MCP exposes the minimum closure for watch management (`list_watches` + `cancel_watch`). Pause / resume / trigger and the unified `update` verb are intentionally not exposed here — use the REST `/api/v1/watches/*` endpoints or the `ov task watch` CLI for those operations. -> Feishu/Lark imports with `args.feishu_access_token` are one-time only. They cannot be combined with `watch_interval > 0` because OpenViking does not store or refresh user tokens. +> Feishu/Lark imports without `args.feishu_access_token` keep the existing app/tenant-token behavior and can be watched. Feishu/Lark one-time user-token imports pass only `args.feishu_access_token`; Feishu/Lark user-token watches must also pass `args.feishu_refresh_token` and require the same Feishu app credentials configured on the OpenViking server. ### Adding local-file resources (progressive upload) diff --git a/docs/zh/api/02-resources.md b/docs/zh/api/02-resources.md index ce295ce850..45109cabbf 100644 --- a/docs/zh/api/02-resources.md +++ b/docs/zh/api/02-resources.md @@ -42,7 +42,7 @@ OpenViking 支持多种资源类型,按照功能分类如下: 云文档类 | 类型 | 说明 | |------|------| -| 飞书/Lark | URL 方式,支持 docx, wiki, sheets, bitable。默认使用 FEISHU_APP_ID 和 FEISHU_APP_SECRET 应用凭证;一次性用户 token 导入可传 `args.feishu_access_token` | +| 飞书/Lark | URL 方式,支持 docx, wiki, sheets, bitable。默认使用 FEISHU_APP_ID 和 FEISHU_APP_SECRET 应用凭证;用户 token 导入可传 `args.feishu_access_token`,用户 token watch 还需传 `args.feishu_refresh_token` | ### 资源处理流程 @@ -161,8 +161,11 @@ URL/文件 Parser TreeBuilder AGFS Summarizer/Vector - 只有 Git 仓库来源在 `wait=false` 时使用完整后台导入;OpenViking 会先完成仓库 preflight 和目标规划,再返回 `task_id`。 - 其他来源在 `wait=false` 时会在响应前完成来源解析、目标解析和 AGFS 写入,仅 semantic 与 embedding 队列继续异步处理。 - `watch_interval > 0` 时,如果指定了 `to`,监控任务绑定该目标;如果未指定 `to`,监控任务绑定本次导入返回的 `root_uri`。如果无法得到稳定 `root_uri`,请求会报错并要求显式传 `to`。 -- 飞书/Lark 用户 token 导入通过 `args={"feishu_access_token": "u-..."}` 传入。该模式只支持一次性导入:`watch_interval > 0` 会被拒绝,因为 OpenViking 不保存也不刷新用户 token。 -- 飞书/Lark 仍需要应用凭证来构造 Lark client。未传 `args.feishu_access_token` 时,OpenViking 保持原有应用凭证流程,由 SDK 使用 `app_id` 和 `app_secret` 自动获取 tenant access token。 +- 飞书/Lark 应用 token 导入不传 `args.feishu_access_token`。OpenViking 保持原有应用凭证流程,由 SDK 使用 `app_id` 和 `app_secret` 自动获取 app/tenant token。该模式支持一次性导入和 `watch_interval > 0`。 +- 飞书/Lark 一次性用户 token 导入通过 `args={"feishu_access_token": "u-..."}` 传入,且 `watch_interval <= 0`。OpenViking 只在本次导入使用该用户 token,不保存。 +- 飞书/Lark 用户 token watch 通过 `args={"feishu_access_token": "u-...", "feishu_refresh_token": "r-..."}` 传入,且 `watch_interval > 0`。OpenViking 会把 token 状态保存在 watch task 私有状态里,用配置的飞书应用凭证刷新,并在后续 watch 重跑中使用刷新后的用户 token。 +- 飞书/Lark 用户 token watch 需要 `FEISHU_APP_ID` 和 `FEISHU_APP_SECRET`,或 `ov.conf` 中的 `feishu.app_id` 和 `feishu.app_secret`。飞书 refresh token 绑定签发它的应用,因此传入的用户 token 必须来自 OpenViking 当前配置的同一个飞书应用。 +- Watch task 的 token 状态保存在内部控制文件 `viking://resources/.watch_tasks.json` 中,不会出现在 watch API/MCP/CLI 返回里。若启用了 VikingFS 文件加密,该控制文件会静态加密;否则服务端控制文件中会包含明文 token 状态。 - 本地目录输入会遵循 `.gitignore`(根目录和子目录,标准 Git 语义);`ignore_dirs`、`include`、`exclude` 会在此基础上进一步过滤。 - 如果要直接创建或更新纯文本内容,请使用 [content/write](03-filesystem.md#write),不要使用 `add_resource`。资源导入和内容写入后都会自动刷新语义与 embedding。 @@ -213,6 +216,20 @@ curl -X POST http://localhost:1933/api/v1/resources \ "feishu_access_token": "u-..." } }' + +# 使用用户 token 自动刷新添加飞书文档 +curl -X POST http://localhost:1933/api/v1/resources \ + -H "Content-Type: application/json" \ + -H "X-API-Key: your-key" \ + -d '{ + "path": "https://example.feishu.cn/docx/doc_token", + "to": "viking://resources/feishu/doc", + "watch_interval": 1440, + "args": { + "feishu_access_token": "u-...", + "feishu_refresh_token": "r-..." + } + }' ``` **Python SDK** @@ -257,6 +274,17 @@ client.add_resource( "https://example.feishu.cn/docx/doc_token", args={"feishu_access_token": "u-..."}, ) + +# 使用用户 token 自动刷新添加飞书文档 +client.add_resource( + "https://example.feishu.cn/docx/doc_token", + to="viking://resources/feishu/doc", + watch_interval=1440, + args={ + "feishu_access_token": "u-...", + "feishu_refresh_token": "r-...", + }, +) ``` **CLI** @@ -283,6 +311,13 @@ ov add-resource https://github.com/example/repo.git --to viking://resources/my_r # 使用一次性用户 access token 添加飞书文档 ov add-resource https://example.feishu.cn/docx/doc_token --args feishu_access_token:u-... +# 使用用户 token 自动刷新添加飞书文档 +ov add-resource https://example.feishu.cn/docx/doc_token \ + --to viking://resources/feishu/doc \ + --watch-interval 1440 \ + --args feishu_access_token:u-... \ + --args feishu_refresh_token:r-... + # 添加到指定父目录(父目录必须存在) ov add-resource ./documents/guide.md --parent viking://resources/docs diff --git a/docs/zh/guides/06-mcp-integration.md b/docs/zh/guides/06-mcp-integration.md index f05a2dcc6a..a7635ced53 100644 --- a/docs/zh/guides/06-mcp-integration.md +++ b/docs/zh/guides/06-mcp-integration.md @@ -109,7 +109,7 @@ claude mcp add --transport http openviking \ | `read` | 读取一个或多个 `viking://` URI 的内容 | `uris`(单个字符串或数组) | | `list` | 列出 `viking://` 目录下的条目 | `uri`, `recursive`(可选) | | `store` | 存储消息到长期记忆(触发记忆提取) | `messages`(`{role, content}` 列表) | -| `add_resource` | 添加本地文件或 URL 作为资源(本地文件触发渐进式上传流) | `path`, `temp_file_id`(可选), `description`(可选), `watch_interval`(可选,分钟数 — 远程 URL 的自动刷新周期), `to`(可选,目标 `viking://resources/...` URI;`watch_interval > 0` 时若省略 `to`,watch 将自动绑定到本次 add 创建的资源 URI), `args`(可选,特定 parser 参数,例如飞书一次性用户 token 导入使用 `{"feishu_access_token":"u-..."}`) | +| `add_resource` | 添加本地文件或 URL 作为资源(本地文件触发渐进式上传流) | `path`, `temp_file_id`(可选), `description`(可选), `watch_interval`(可选,分钟数 — 远程 URL 的自动刷新周期), `to`(可选,目标 `viking://resources/...` URI;`watch_interval > 0` 时若省略 `to`,watch 将自动绑定到本次 add 创建的资源 URI), `args`(可选,特定 parser 参数,例如飞书一次性用户 token 导入使用 `{"feishu_access_token":"u-..."}`,飞书用户 token watch 使用 `{"feishu_access_token":"u-...","feishu_refresh_token":"r-..."}`) | | `list_watches` | 列出当前 Agent 可见的 watch 任务(自动刷新订阅),每行显示目标 URI、刷新间隔(分钟)、active/paused 状态以及下一次调度时间 | 无 | | `cancel_watch` | 按目标 URI 取消(删除)watch 任务。若需调整刷新周期或临时暂停,请取消后使用新的 `watch_interval` 重新添加 | `to_uri`(必须匹配 watch 任务的 `to` 值,例如 `viking://resources/...`) | | `grep` | 在 `viking://` 文件中进行正则内容搜索 | `uri`, `pattern`(字符串), `case_insensitive` | @@ -122,7 +122,7 @@ claude mcp add --transport http openviking \ > **注**:MCP 仅暴露 watch 管理的最小闭包(`list_watches` + `cancel_watch`)。pause / resume / trigger 和统一的 `update` 动作刻意不在此处暴露,请通过 REST `/api/v1/watches/*` 接口或 `ov task watch` CLI 使用上述操作。 -> 使用 `args.feishu_access_token` 的飞书/Lark 导入只支持一次性导入,不能和 `watch_interval > 0` 组合,因为 OpenViking 不保存也不刷新用户 token。 +> 未传 `args.feishu_access_token` 的飞书/Lark 导入保持现有应用/tenant token 行为,也支持 watch。飞书/Lark 一次性用户 token 导入只传 `args.feishu_access_token`;飞书/Lark 用户 token watch 还必须传 `args.feishu_refresh_token`,并要求 OpenViking 服务端配置同一个飞书应用凭证。 ### 添加本地文件资源(渐进式上传) diff --git a/openviking/resource/feishu_watch_auth.py b/openviking/resource/feishu_watch_auth.py new file mode 100644 index 0000000000..68d91ee728 --- /dev/null +++ b/openviking/resource/feishu_watch_auth.py @@ -0,0 +1,240 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Feishu user-token state for watch tasks.""" + +from __future__ import annotations + +import asyncio +import os +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional + +FEISHU_AUTH_PROVIDER = "feishu" +FEISHU_ACCESS_TOKEN_ARG = "feishu_access_token" +FEISHU_REFRESH_TOKEN_ARG = "feishu_refresh_token" +FEISHU_REFRESH_GRANT_TYPE = "refresh_token" +FEISHU_REFRESH_SKEW = timedelta(minutes=5) + + +@dataclass(frozen=True) +class FeishuAppCredentials: + app_id: str + app_secret: str + domain: str + request_timeout: float + + +@dataclass(frozen=True) +class FeishuRefreshedToken: + access_token: str + refresh_token: str + expires_in: int + + +class FeishuTokenRefreshError(Exception): + """Raised when a Feishu user token refresh fails.""" + + def __init__(self, message: str, *, permanent: bool = False): + super().__init__(message) + self.permanent = permanent + + +def load_feishu_app_credentials() -> FeishuAppCredentials: + """Load Feishu app credentials from OpenViking config or environment.""" + from openviking_cli.utils.config import get_openviking_config + + config = get_openviking_config().feishu + app_id = (config.app_id or os.getenv("FEISHU_APP_ID", "")).strip() + app_secret = (config.app_secret or os.getenv("FEISHU_APP_SECRET", "")).strip() + if not app_id or not app_secret: + raise ValueError( + "Feishu credentials not configured. Set FEISHU_APP_ID and " + "FEISHU_APP_SECRET environment variables, or configure in ov.conf." + ) + return FeishuAppCredentials( + app_id=app_id, + app_secret=app_secret, + domain=(config.domain or "https://open.feishu.cn").strip(), + request_timeout=float(config.request_timeout or 30.0), + ) + + +def create_feishu_auth_state(access_token: str, refresh_token: str) -> Dict[str, Any]: + """Create the private watch auth state for Feishu user-token watch tasks.""" + return { + "provider": FEISHU_AUTH_PROVIDER, + "access_token": access_token, + "refresh_token": refresh_token, + "expires_at": None, + } + + +def is_feishu_auth_state(auth_state: Optional[Dict[str, Any]]) -> bool: + return isinstance(auth_state, dict) and auth_state.get("provider") == FEISHU_AUTH_PROVIDER + + +def feishu_auth_state_needs_refresh( + auth_state: Dict[str, Any], + *, + now: Optional[datetime] = None, +) -> bool: + expires_at = auth_state.get("expires_at") + if not expires_at: + return True + + parsed = _parse_expires_at(expires_at) + if parsed is None: + return True + + current = now or datetime.now(timezone.utc) + if current.tzinfo is None: + current = current.replace(tzinfo=timezone.utc) + return parsed <= current + FEISHU_REFRESH_SKEW + + +def apply_feishu_refreshed_token( + auth_state: Dict[str, Any], + refreshed: FeishuRefreshedToken, + *, + now: Optional[datetime] = None, +) -> Dict[str, Any]: + current = now or datetime.now(timezone.utc) + if current.tzinfo is None: + current = current.replace(tzinfo=timezone.utc) + expires_at = current + timedelta(seconds=max(0, int(refreshed.expires_in))) + return { + **auth_state, + "provider": FEISHU_AUTH_PROVIDER, + "access_token": refreshed.access_token, + "refresh_token": refreshed.refresh_token, + "expires_at": expires_at.isoformat(), + } + + +class FeishuOAuthClient: + """Small wrapper around lark-oapi user token refresh.""" + + def __init__(self, credentials: FeishuAppCredentials): + self._credentials = credentials + self._client = None + + @classmethod + def from_config(cls) -> "FeishuOAuthClient": + return cls(load_feishu_app_credentials()) + + async def refresh_user_access_token(self, refresh_token: str) -> FeishuRefreshedToken: + if not isinstance(refresh_token, str) or not refresh_token.strip(): + raise FeishuTokenRefreshError( + "Feishu refresh token is missing for watch task.", + permanent=True, + ) + return await asyncio.to_thread(self._refresh_user_access_token_sync, refresh_token.strip()) + + def _refresh_user_access_token_sync(self, refresh_token: str) -> FeishuRefreshedToken: + try: + from lark_oapi.api.authen.v1 import ( + CreateRefreshAccessTokenRequest, + CreateRefreshAccessTokenRequestBody, + ) + except ImportError as exc: + raise FeishuTokenRefreshError( + "lark-oapi is required to refresh Feishu user tokens. " + "Install it with: pip install 'openviking[bot-feishu]'", + permanent=True, + ) from exc + + request = ( + CreateRefreshAccessTokenRequest.builder() + .request_body( + CreateRefreshAccessTokenRequestBody.builder() + .grant_type(FEISHU_REFRESH_GRANT_TYPE) + .refresh_token(refresh_token) + .build() + ) + .build() + ) + + try: + response = self._get_client().authen.v1.refresh_access_token.create(request) + except FeishuTokenRefreshError: + raise + except Exception as exc: + raise FeishuTokenRefreshError( + f"Failed to refresh Feishu user token: {exc}", + permanent=False, + ) from exc + + if not response.success(): + code = getattr(response, "code", None) + msg = getattr(response, "msg", "") or "" + raise FeishuTokenRefreshError( + f"Feishu user token refresh failed: code={code}, msg={msg}", + permanent=_is_permanent_refresh_error(code, msg), + ) + + data = getattr(response, "data", None) + access_token = (getattr(data, "access_token", None) or "").strip() + new_refresh_token = (getattr(data, "refresh_token", None) or refresh_token).strip() + expires_in = getattr(data, "expires_in", None) + if not access_token or not new_refresh_token or not isinstance(expires_in, int): + raise FeishuTokenRefreshError( + "Feishu user token refresh response is missing access_token, " + "refresh_token, or expires_in.", + permanent=False, + ) + + return FeishuRefreshedToken( + access_token=access_token, + refresh_token=new_refresh_token, + expires_in=expires_in, + ) + + def _get_client(self): + if self._client is None: + try: + import lark_oapi as lark + except ImportError as exc: + raise FeishuTokenRefreshError( + "lark-oapi is required to refresh Feishu user tokens. " + "Install it with: pip install 'openviking[bot-feishu]'", + permanent=True, + ) from exc + + self._client = ( + lark.Client.builder() + .app_id(self._credentials.app_id) + .app_secret(self._credentials.app_secret) + .domain(self._credentials.domain) + .timeout(self._credentials.request_timeout) + .build() + ) + return self._client + + +def _parse_expires_at(value: Any) -> Optional[datetime]: + if not isinstance(value, str) or not value.strip(): + return None + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + + +def _is_permanent_refresh_error(code: Any, msg: str) -> bool: + text = f"{code or ''} {msg or ''}".lower() + permanent_terms = ( + "invalid", + "expired", + "revoked", + "unauthorized", + "not exist", + "not found", + "mismatch", + "refresh token", + "refresh_token", + ) + return any(term in text for term in permanent_terms) diff --git a/openviking/resource/watch_manager.py b/openviking/resource/watch_manager.py index cdad6d94f2..758d5d2cb2 100644 --- a/openviking/resource/watch_manager.py +++ b/openviking/resource/watch_manager.py @@ -24,6 +24,8 @@ logger = get_logger(__name__) +_UNSET = object() + class WatchTask(BaseModel): """Resource monitoring task data model.""" @@ -42,6 +44,9 @@ class WatchTask(BaseModel): processor_kwargs: Dict[str, Any] = Field( default_factory=dict, description="Extra kwargs forwarded to processor" ) + auth_state: Optional[Dict[str, Any]] = Field( + default=None, description="Private authentication state for scheduled re-processing" + ) created_at: datetime = Field(default_factory=datetime.now, description="Task creation time") last_execution_time: Optional[datetime] = Field(None, description="Last execution time") next_execution_time: Optional[datetime] = Field(None, description="Next execution time") @@ -55,7 +60,7 @@ class Config: extra = "ignore" def to_dict(self) -> Dict[str, Any]: - """Convert task to dictionary.""" + """Convert task to public dictionary.""" return { "task_id": self.task_id, "path": self.path, @@ -80,9 +85,17 @@ def to_dict(self) -> Dict[str, Any]: "original_role": self.original_role, } + def to_storage_dict(self) -> Dict[str, Any]: + """Convert task to dictionary for watch-task persistence.""" + data = self.to_dict() + if self.auth_state is not None: + data["auth_state"] = self.auth_state + return data + @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WatchTask": """Create task from dictionary.""" + data = dict(data) if isinstance(data.get("created_at"), str): data["created_at"] = datetime.fromisoformat(data["created_at"]) if isinstance(data.get("last_execution_time"), str): @@ -91,6 +104,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "WatchTask": data["next_execution_time"] = datetime.fromisoformat(data["next_execution_time"]) if data.get("processor_kwargs") is None: data["processor_kwargs"] = {} + if data.get("auth_state") is not None and not isinstance(data.get("auth_state"), dict): + data["auth_state"] = None return cls(**data) def calculate_next_execution_time(self) -> datetime: @@ -231,7 +246,7 @@ async def _save_tasks(self) -> None: ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.ROOT) data = { - "tasks": [task.to_dict() for task in self._tasks.values()], + "tasks": [task.to_storage_dict() for task in self._tasks.values()], "updated_at": datetime.now().isoformat(), } @@ -341,6 +356,7 @@ async def create_task( build_index: bool = True, summarize: bool = False, processor_kwargs: Optional[Dict[str, Any]] = None, + auth_state: Optional[Dict[str, Any]] = None, ) -> WatchTask: """Create a new monitoring task. @@ -383,6 +399,7 @@ async def create_task( build_index=build_index, summarize=summarize, processor_kwargs=processor_kwargs or {}, + auth_state=auth_state, account_id=account_id, user_id=user_id, original_role=original_role, @@ -416,6 +433,7 @@ async def update_task( build_index: Optional[bool] = None, summarize: Optional[bool] = None, processor_kwargs: Optional[Dict[str, Any]] = None, + auth_state: Any = _UNSET, is_active: Optional[bool] = None, ) -> WatchTask: """Update an existing monitoring task. @@ -484,6 +502,8 @@ async def update_task( task.summarize = summarize if processor_kwargs is not None: task.processor_kwargs = processor_kwargs + if auth_state is not _UNSET: + task.auth_state = auth_state if is_active is not None: task.is_active = is_active @@ -512,6 +532,19 @@ async def update_task( logger.info(f"[WatchManager] Updated task {task_id} by user {account_id}/{user_id}") return task + async def update_auth_state( + self, + task_id: str, + auth_state: Optional[Dict[str, Any]], + ) -> None: + """Update private auth state for an existing watch task.""" + async with self._lock: + task = self._tasks.get(task_id) + if not task: + return + task.auth_state = auth_state + await self._save_tasks() + async def delete_task( self, task_id: str, diff --git a/openviking/resource/watch_scheduler.py b/openviking/resource/watch_scheduler.py index d1f32a1e27..01f5953528 100644 --- a/openviking/resource/watch_scheduler.py +++ b/openviking/resource/watch_scheduler.py @@ -8,8 +8,15 @@ import asyncio from datetime import datetime -from typing import Any, Optional, Set - +from typing import Any, Dict, Optional, Set + +from openviking.resource.feishu_watch_auth import ( + FeishuOAuthClient, + FeishuTokenRefreshError, + apply_feishu_refreshed_token, + feishu_auth_state_needs_refresh, + is_feishu_auth_state, +) from openviking.resource.watch_manager import WatchManager from openviking.server.identity import RequestContext, Role from openviking.service.resource_service import ResourceService @@ -58,6 +65,7 @@ def __init__( self._scheduler_task: Optional[asyncio.Task] = None self._executing_tasks: Set[str] = set() self._lock = asyncio.Lock() + self._feishu_oauth_client: Optional[Any] = None @property def watch_manager(self) -> Optional[WatchManager]: @@ -244,24 +252,41 @@ async def _execute_task(self, task) -> None: processor_kwargs = dict(getattr(task, "processor_kwargs", {}) or {}) processor_kwargs.pop("build_index", None) processor_kwargs.pop("summarize", None) - result = await self._resource_service.add_resource( - path=task.path, - ctx=ctx, - to=task.to_uri, - parent=task.parent_uri, - reason=task.reason, - instruction=task.instruction, - build_index=getattr(task, "build_index", True), - summarize=getattr(task, "summarize", False), - watch_interval=task.watch_interval, - skip_watch_management=True, - **processor_kwargs, - ) - - logger.info( - f"[WatchScheduler] Task {task.task_id} executed successfully, " - f"result: {result.get('root_uri', 'N/A')}" - ) + auth_state = getattr(task, "auth_state", None) + if is_feishu_auth_state(auth_state): + try: + auth_state = await self._prepare_feishu_auth_state(task, auth_state) + processor_kwargs["feishu_access_token"] = auth_state["access_token"] + except FeishuTokenRefreshError as e: + if e.permanent: + should_deactivate = True + deactivation_reason = str(e) + logger.error( + f"[WatchScheduler] Task {task.task_id} permanent Feishu " + f"token refresh failure: {e}. Deactivating task." + ) + else: + raise + + if not should_deactivate: + result = await self._resource_service.add_resource( + path=task.path, + ctx=ctx, + to=task.to_uri, + parent=task.parent_uri, + reason=task.reason, + instruction=task.instruction, + build_index=getattr(task, "build_index", True), + summarize=getattr(task, "summarize", False), + watch_interval=task.watch_interval, + skip_watch_management=True, + **processor_kwargs, + ) + + logger.info( + f"[WatchScheduler] Task {task.task_id} executed successfully, " + f"result: {result.get('root_uri', 'N/A')}" + ) except asyncio.CancelledError: cancelled = True @@ -318,6 +343,26 @@ async def _discard_executing(self, task_id: str) -> None: async with self._lock: self._executing_tasks.discard(task_id) + async def _prepare_feishu_auth_state( + self, + task, + auth_state: Dict[str, Any], + ) -> Dict[str, Any]: + if not feishu_auth_state_needs_refresh(auth_state): + return auth_state + + refresh_token = auth_state.get("refresh_token") + refreshed = await self._get_feishu_oauth_client().refresh_user_access_token(refresh_token) + updated = apply_feishu_refreshed_token(auth_state, refreshed) + if self._watch_manager is not None: + await self._watch_manager.update_auth_state(task.task_id, updated) + return updated + + def _get_feishu_oauth_client(self): + if self._feishu_oauth_client is None: + self._feishu_oauth_client = FeishuOAuthClient.from_config() + return self._feishu_oauth_client + def _check_resource_exists(self, path: str) -> bool: """Check if a resource path exists. diff --git a/openviking/server/mcp_endpoint.py b/openviking/server/mcp_endpoint.py index 036f83df27..7dc7589ab5 100644 --- a/openviking/server/mcp_endpoint.py +++ b/openviking/server/mcp_endpoint.py @@ -473,8 +473,9 @@ async def add_resource( to: Target URI under viking://resources/ (e.g. "viking://resources/volcengine/OpenViking"). Leave empty to let the system derive a URI from the source. - args: Parser-specific import options. For Feishu user-token imports, pass - {"feishu_access_token": "..."}. + args: Parser-specific import options. For Feishu one-time user-token imports, + pass {"feishu_access_token": "..."}. For Feishu user-token watches, + pass {"feishu_access_token": "...", "feishu_refresh_token": "..."}. """ from openviking.server.local_input_guard import require_remote_resource_source diff --git a/openviking/server/routers/resources.py b/openviking/server/routers/resources.py index 725f2586f0..6c2432b746 100644 --- a/openviking/server/routers/resources.py +++ b/openviking/server/routers/resources.py @@ -50,8 +50,9 @@ class AddResourceRequest(BaseModel): exclude: Glob pattern for files to exclude during parsing. directly_upload_media: Whether to directly upload media files. Default is True. preserve_structure: Whether to preserve directory structure when adding directories. - args: Parser-specific import options. For Feishu user-token imports, pass - {"feishu_access_token": "..."}. + args: Parser-specific import options. For Feishu one-time user-token imports, + pass {"feishu_access_token": "..."}. For Feishu user-token watches, + pass {"feishu_access_token": "...", "feishu_refresh_token": "..."}. watch_interval: Watch interval in minutes for automatic resource monitoring. - watch_interval > 0: Creates or updates a watch task. The resource will be automatically re-processed at the specified interval. diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 6be559aab1..9a05653d8e 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -18,6 +18,12 @@ from openviking.core.path_variables import resolve_path_variables from openviking.core.uri_validation import validate_optional_viking_uri +from openviking.resource.feishu_watch_auth import ( + FEISHU_ACCESS_TOKEN_ARG, + FEISHU_REFRESH_TOKEN_ARG, + create_feishu_auth_state, + load_feishu_app_credentials, +) from openviking.server.identity import RequestContext from openviking.server.local_input_guard import ( is_remote_resource_source, @@ -96,6 +102,12 @@ class _ResourceSourceInfo: source_format: Optional[str] = None +@dataclass +class _NormalizedAddResourceArgs: + processor_kwargs: Dict[str, Any] + watch_auth_state: Optional[Dict[str, Any]] = None + + class ResourceService: """Resource management service.""" @@ -149,13 +161,13 @@ def _normalize_add_resource_args( args: Optional[Dict[str, Any]], *, watch_interval: float, - ) -> Dict[str, Any]: + ) -> _NormalizedAddResourceArgs: if args is None: - return {} + return _NormalizedAddResourceArgs({}) if not isinstance(args, dict): raise InvalidArgumentError("args must be an object.") if not args: - return {} + return _NormalizedAddResourceArgs({}) reserved = sorted(set(args).intersection(_ADD_RESOURCE_ARGS_RESERVED_FIELDS)) if reserved: @@ -164,18 +176,42 @@ def _normalize_add_resource_args( ) normalized = dict(args) - token = normalized.get("feishu_access_token") + token = normalized.get(FEISHU_ACCESS_TOKEN_ARG) + refresh_token = normalized.pop(FEISHU_REFRESH_TOKEN_ARG, None) + watch_auth_state = None if token is not None: if not isinstance(token, str) or not token.strip(): raise InvalidArgumentError("args.feishu_access_token must be a non-empty string.") + token = token.strip() + normalized[FEISHU_ACCESS_TOKEN_ARG] = token if watch_interval > 0: + if not isinstance(refresh_token, str) or not refresh_token.strip(): + raise InvalidArgumentError( + "args.feishu_refresh_token must be a non-empty string when " + "args.feishu_access_token is used with watch_interval > 0." + ) + self._ensure_feishu_credentials_for_watch() + watch_auth_state = create_feishu_auth_state(token, refresh_token.strip()) + elif refresh_token is not None: raise InvalidArgumentError( - "args.feishu_access_token only supports one-time import; " - "watch_interval must be 0." + "args.feishu_refresh_token is only supported with " + "args.feishu_access_token and watch_interval > 0." ) - normalized["feishu_access_token"] = token.strip() + elif refresh_token is not None: + raise InvalidArgumentError( + "args.feishu_refresh_token requires args.feishu_access_token." + ) + + return _NormalizedAddResourceArgs(normalized, watch_auth_state) - return normalized + def _ensure_feishu_credentials_for_watch(self) -> None: + try: + load_feishu_app_credentials() + except Exception as exc: + raise InvalidArgumentError( + "Feishu user-token watch requires FEISHU_APP_ID and " + "FEISHU_APP_SECRET, or feishu.app_id and feishu.app_secret in ov.conf." + ) from exc def _ensure_initialized(self) -> None: """Ensure all dependencies are initialized.""" @@ -216,7 +252,8 @@ async def enqueue_git_add_resource( ) -> Dict[str, Any]: """Start background ingestion for Git repositories while reserving the target URI.""" self._ensure_initialized() - kwargs.update(self._normalize_add_resource_args(args, watch_interval=watch_interval)) + normalized_args = self._normalize_add_resource_args(args, watch_interval=watch_interval) + kwargs.update(normalized_args.processor_kwargs) if to: to = resolve_path_variables(to) @@ -531,7 +568,8 @@ async def add_resource( InvalidArgumentError: If the URI scope is not 'resources' """ self._ensure_initialized() - kwargs.update(self._normalize_add_resource_args(args, watch_interval=watch_interval)) + normalized_args = self._normalize_add_resource_args(args, watch_interval=watch_interval) + kwargs.update(normalized_args.processor_kwargs) if not wait and is_git_repo_url(path): return await self.enqueue_git_add_resource( path=path, @@ -671,6 +709,8 @@ async def add_resource( ) try: processor_kwargs = self._sanitize_watch_processor_kwargs(kwargs) + if normalized_args.watch_auth_state is not None: + processor_kwargs.pop(FEISHU_ACCESS_TOKEN_ARG, None) await self._handle_watch_task_creation( path=path, to_uri=watch_to, @@ -681,6 +721,7 @@ async def add_resource( build_index=build_index, summarize=summarize, processor_kwargs=processor_kwargs, + auth_state=normalized_args.watch_auth_state, ctx=ctx, ) except ConflictError: @@ -794,6 +835,7 @@ async def _handle_watch_task_creation( build_index: bool, summarize: bool, processor_kwargs: Dict[str, Any], + auth_state: Optional[Dict[str, Any]], ctx: RequestContext, ) -> None: """Handle creation or update of watch task. @@ -841,6 +883,7 @@ async def _handle_watch_task_creation( build_index=build_index, summarize=summarize, processor_kwargs=processor_kwargs, + auth_state=auth_state, is_active=True, ) logger.info( @@ -860,6 +903,7 @@ async def _handle_watch_task_creation( build_index=build_index, summarize=summarize, processor_kwargs=processor_kwargs, + auth_state=auth_state, ) logger.info(f"[ResourceService] Created watch task {task.task_id} for {to_uri}") diff --git a/tests/resource/test_feishu_watch_auth.py b/tests/resource/test_feishu_watch_auth.py new file mode 100644 index 0000000000..b47550a7b6 --- /dev/null +++ b/tests/resource/test_feishu_watch_auth.py @@ -0,0 +1,137 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Tests for Feishu watch auth helpers.""" + +import sys +from datetime import datetime, timedelta, timezone +from types import ModuleType, SimpleNamespace + +from openviking.resource.feishu_watch_auth import ( + FEISHU_REFRESH_GRANT_TYPE, + FeishuAppCredentials, + FeishuOAuthClient, + FeishuRefreshedToken, + apply_feishu_refreshed_token, + create_feishu_auth_state, + feishu_auth_state_needs_refresh, +) + + +class _FakeRefreshAccessTokenRequest: + @staticmethod + def builder(): + return _FakeRefreshAccessTokenRequestBuilder() + + +class _FakeRefreshAccessTokenRequestBuilder: + def __init__(self): + self._request = SimpleNamespace(body=None) + + def request_body(self, body): + self._request.body = body + return self + + def build(self): + return self._request + + +class _FakeRefreshAccessTokenRequestBody: + @staticmethod + def builder(): + return _FakeRefreshAccessTokenRequestBodyBuilder() + + +class _FakeRefreshAccessTokenRequestBodyBuilder: + def __init__(self): + self._body = SimpleNamespace(grant_type=None, refresh_token=None) + + def grant_type(self, grant_type): + self._body.grant_type = grant_type + return self + + def refresh_token(self, refresh_token): + self._body.refresh_token = refresh_token + return self + + def build(self): + return self._body + + +class _SuccessRefreshResponse: + code = 0 + msg = "" + + def __init__(self): + self.data = SimpleNamespace( + access_token="u-new", + refresh_token="r-new", + expires_in=7200, + ) + + @staticmethod + def success(): + return True + + +class _FakeRefreshAccessToken: + def __init__(self): + self.request = None + + def create(self, request): + self.request = request + return _SuccessRefreshResponse() + + +def test_feishu_oauth_client_refreshes_user_access_token_with_sdk_request(monkeypatch): + authen_v1 = ModuleType("lark_oapi.api.authen.v1") + authen_v1.CreateRefreshAccessTokenRequest = _FakeRefreshAccessTokenRequest + authen_v1.CreateRefreshAccessTokenRequestBody = _FakeRefreshAccessTokenRequestBody + monkeypatch.setitem(sys.modules, "lark_oapi.api.authen.v1", authen_v1) + + refresh_access_token = _FakeRefreshAccessToken() + client = FeishuOAuthClient( + FeishuAppCredentials( + app_id="app-id", + app_secret="app-secret", + domain="https://open.feishu.cn", + request_timeout=30.0, + ) + ) + client._client = SimpleNamespace( + authen=SimpleNamespace( + v1=SimpleNamespace(refresh_access_token=refresh_access_token), + ) + ) + + refreshed = client._refresh_user_access_token_sync("r-old") + + assert refreshed == FeishuRefreshedToken( + access_token="u-new", + refresh_token="r-new", + expires_in=7200, + ) + assert refresh_access_token.request.body.grant_type == FEISHU_REFRESH_GRANT_TYPE + assert refresh_access_token.request.body.refresh_token == "r-old" + + +def test_feishu_auth_state_refresh_window(): + now = datetime(2026, 6, 10, 12, 0, tzinfo=timezone.utc) + state = create_feishu_auth_state("u-old", "r-old") + + assert feishu_auth_state_needs_refresh(state, now=now) is True + + refreshed = apply_feishu_refreshed_token( + state, + FeishuRefreshedToken(access_token="u-new", refresh_token="r-new", expires_in=7200), + now=now, + ) + + assert refreshed["access_token"] == "u-new" + assert refreshed["refresh_token"] == "r-new" + assert feishu_auth_state_needs_refresh(refreshed, now=now) is False + + near_expiry = { + **refreshed, + "expires_at": (now + timedelta(minutes=4)).isoformat(), + } + assert feishu_auth_state_needs_refresh(near_expiry, now=now) is True diff --git a/tests/resource/test_watch_manager.py b/tests/resource/test_watch_manager.py index ea6a62c4a2..ca71023048 100644 --- a/tests/resource/test_watch_manager.py +++ b/tests/resource/test_watch_manager.py @@ -133,6 +133,12 @@ def test_to_dict(self): task_id="test-id", path="/test/path", to_uri="viking://test", + auth_state={ + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + }, created_at=now, ) @@ -143,6 +149,28 @@ def test_to_dict(self): assert data["to_uri"] == "viking://test" assert data["created_at"] == now.isoformat() assert data["is_active"] is True + assert "auth_state" not in data + + def test_to_storage_dict_includes_private_auth_state(self): + task = WatchTask( + task_id="test-id", + path="/test/path", + auth_state={ + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + }, + ) + + data = task.to_storage_dict() + + assert data["auth_state"] == { + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + } def test_from_dict(self): """Test creating task from dictionary.""" @@ -159,6 +187,12 @@ def test_from_dict(self): "last_execution_time": now.isoformat(), "next_execution_time": (now + timedelta(minutes=45)).isoformat(), "is_active": False, + "auth_state": { + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + }, } task = WatchTask.from_dict(data) @@ -170,6 +204,7 @@ def test_from_dict(self): assert task.is_active is False assert task.created_at == now assert task.last_execution_time == now + assert task.auth_state == data["auth_state"] def test_calculate_next_execution_time(self): """Test calculating next execution time.""" @@ -222,6 +257,32 @@ async def test_create_task(self, watch_manager: WatchManager): assert task.is_active is True assert task.next_execution_time is not None + @pytest.mark.asyncio + async def test_auth_state_persisted_and_hidden_from_public_dict( + self, mock_viking_fs: MockVikingFS + ): + manager1 = WatchManager(viking_fs=mock_viking_fs) + await manager1.initialize() + task = await manager1.create_task( + path="https://example.feishu.cn/docx/doc_token", + to_uri="viking://resources/feishu", + watch_interval=30.0, + auth_state={ + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + }, + ) + + manager2 = WatchManager(viking_fs=mock_viking_fs) + await manager2.initialize() + loaded = await manager2.get_task(task.task_id) + + assert loaded is not None + assert loaded.auth_state == task.auth_state + assert "auth_state" not in loaded.to_dict() + @pytest.mark.asyncio async def test_create_task_without_path_raises(self, watch_manager: WatchManager): """Test that creating a task without path raises error.""" diff --git a/tests/server/test_api_watches.py b/tests/server/test_api_watches.py index 59d468d7f2..699867cbe2 100644 --- a/tests/server/test_api_watches.py +++ b/tests/server/test_api_watches.py @@ -25,6 +25,7 @@ async def _seed( role="user", interval=60.0, path="https://example.com/foo", + auth_state=None, ): return await wm.create_task( path=path, @@ -33,6 +34,7 @@ async def _seed( original_role=role, to_uri=to_uri, watch_interval=interval, + auth_state=auth_state, ) @@ -152,6 +154,29 @@ async def test_dual_key_matching_accepted(client: httpx.AsyncClient, watch_manag assert resp.json()["result"]["task_id"] == task.task_id +async def test_watch_api_hides_private_auth_state(client: httpx.AsyncClient, watch_manager): + task = await _seed( + watch_manager, + to_uri="viking://resources/test/feishu-user-watch", + auth_state={ + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + }, + ) + + list_resp = await client.get("/api/v1/watches") + assert list_resp.status_code == 200 + returned = list_resp.json()["result"]["tasks"][0] + assert returned["task_id"] == task.task_id + assert "auth_state" not in returned + + get_resp = await client.get(f"/api/v1/watches/{task.task_id}") + assert get_resp.status_code == 200 + assert "auth_state" not in get_resp.json()["result"] + + async def test_dual_key_mismatch_returns_400(client: httpx.AsyncClient, watch_manager): """When {task_id} and ?to_uri= refer to DIFFERENT tasks, return 400.""" a = await _seed(watch_manager, to_uri="viking://resources/test/dual-a") diff --git a/tests/service/test_resource_service_watch.py b/tests/service/test_resource_service_watch.py index 390eef8e6e..ebb3fb2622 100644 --- a/tests/service/test_resource_service_watch.py +++ b/tests/service/test_resource_service_watch.py @@ -10,6 +10,7 @@ from openviking.resource.watch_manager import WatchManager from openviking.server.identity import RequestContext, Role +from openviking.service import resource_service as resource_service_module from openviking.service.resource_service import ResourceService from openviking_cli.exceptions import ConflictError, InvalidArgumentError from openviking_cli.session.user_id import UserIdentifier @@ -233,7 +234,7 @@ async def test_forwards_args_to_resource_processor( async def test_rejects_feishu_access_token_with_watch( self, resource_service: ResourceService, request_context: RequestContext ): - with pytest.raises(InvalidArgumentError, match="one-time import"): + with pytest.raises(InvalidArgumentError, match="feishu_refresh_token"): await resource_service.add_resource( path="/test/path", ctx=request_context, @@ -241,6 +242,99 @@ async def test_rejects_feishu_access_token_with_watch( args={"feishu_access_token": "u-test"}, ) + @pytest.mark.asyncio + async def test_rejects_feishu_user_token_watch_without_app_config( + self, + monkeypatch: pytest.MonkeyPatch, + resource_service: ResourceService, + request_context: RequestContext, + ): + def raise_missing_credentials(): + raise ValueError("missing Feishu config") + + monkeypatch.setattr( + resource_service_module, + "load_feishu_app_credentials", + raise_missing_credentials, + ) + + with pytest.raises(InvalidArgumentError, match="Feishu user-token watch requires"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + watch_interval=30, + args={ + "feishu_access_token": "u-test", + "feishu_refresh_token": "r-test", + }, + ) + + @pytest.mark.asyncio + async def test_feishu_user_token_watch_stores_private_auth_state( + self, + monkeypatch: pytest.MonkeyPatch, + resource_service: ResourceService, + request_context: RequestContext, + ): + monkeypatch.setattr( + resource_service_module, + "load_feishu_app_credentials", + lambda: object(), + ) + to_uri = "viking://resources/feishu_user_watch" + + await resource_service.add_resource( + path="https://example.feishu.cn/docx/doc_token", + ctx=request_context, + to=to_uri, + watch_interval=30, + args={ + "feishu_access_token": " u-test ", + "feishu_refresh_token": " r-test ", + }, + ) + + processor = resource_service._resource_processor + assert processor.calls[-1]["feishu_access_token"] == "u-test" + assert "feishu_refresh_token" not in processor.calls[-1] + + task = await get_task_by_uri(resource_service, to_uri, request_context) + assert task is not None + assert task.processor_kwargs == {} + assert task.auth_state == { + "provider": "feishu", + "access_token": "u-test", + "refresh_token": "r-test", + "expires_at": None, + } + assert "auth_state" not in task.to_dict() + + @pytest.mark.asyncio + async def test_rejects_feishu_refresh_token_without_watch( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="only supported"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + args={ + "feishu_access_token": "u-test", + "feishu_refresh_token": "r-test", + }, + ) + + @pytest.mark.asyncio + async def test_rejects_feishu_refresh_token_without_access_token( + self, resource_service: ResourceService, request_context: RequestContext + ): + with pytest.raises(InvalidArgumentError, match="requires args.feishu_access_token"): + await resource_service.add_resource( + path="/test/path", + ctx=request_context, + watch_interval=30, + args={"feishu_refresh_token": "r-test"}, + ) + @pytest.mark.asyncio async def test_rejects_invalid_feishu_access_token( self, resource_service: ResourceService, request_context: RequestContext diff --git a/tests/service/test_watch_recovery.py b/tests/service/test_watch_recovery.py index 7f5a40821f..9e578e7c00 100644 --- a/tests/service/test_watch_recovery.py +++ b/tests/service/test_watch_recovery.py @@ -11,6 +11,7 @@ import pytest import pytest_asyncio +from openviking.resource.feishu_watch_auth import FeishuRefreshedToken, FeishuTokenRefreshError from openviking.resource.watch_manager import WatchManager from openviking.resource.watch_scheduler import WatchScheduler from openviking.server.identity import RequestContext, Role @@ -51,10 +52,12 @@ class MockResourceProcessor: def __init__(self): self.call_count = 0 self.processed_paths = [] + self.calls = [] async def process_resource(self, **kwargs): self.call_count += 1 self.processed_paths.append(kwargs.get("path")) + self.calls.append(kwargs) return {"root_uri": kwargs.get("to", "viking://resources/test")} @@ -71,6 +74,25 @@ class MockVikingDB: pass +class FakeFeishuOAuthClient: + def __init__( + self, *, refreshed: FeishuRefreshedToken | None = None, error: Exception | None = None + ): + self.refreshed = refreshed or FeishuRefreshedToken( + access_token="u-new", + refresh_token="r-new", + expires_in=7200, + ) + self.error = error + self.calls = [] + + async def refresh_user_access_token(self, refresh_token: str) -> FeishuRefreshedToken: + self.calls.append(refresh_token) + if self.error is not None: + raise self.error + return self.refreshed + + @pytest_asyncio.fixture async def temp_storage(tmp_path: Path) -> AsyncGenerator[Path, None]: """Create temporary storage directory.""" @@ -398,6 +420,114 @@ async def test_url_resources_always_considered_existing( assert updated_task.is_active is True assert resource_processor.call_count == 1 + @pytest.mark.asyncio + async def test_feishu_user_token_watch_refreshes_before_execution( + self, temp_storage: Path, request_context: RequestContext + ): + resource_processor = MockResourceProcessor() + resource_service = ResourceService( + vikingdb=MockVikingDB(), + viking_fs=MockVikingFS(root_path=str(temp_storage)), + resource_processor=resource_processor, + skill_processor=MockSkillProcessor(), + watch_scheduler=None, + ) + scheduler = WatchScheduler(resource_service=resource_service, viking_fs=None) + await scheduler.start() + scheduler._feishu_oauth_client = FakeFeishuOAuthClient() + watch_manager = scheduler.watch_manager + + task = await watch_manager.create_task( + path="https://example.feishu.cn/docx/doc_token", + to_uri="viking://resources/feishu-user-watch", + watch_interval=30.0, + auth_state={ + "provider": "feishu", + "access_token": "u-old", + "refresh_token": "r-old", + "expires_at": None, + }, + ) + + await scheduler._execute_task(task) + + assert scheduler._feishu_oauth_client.calls == ["r-old"] + assert resource_processor.call_count == 1 + assert resource_processor.calls[-1]["feishu_access_token"] == "u-new" + + updated_task = await watch_manager.get_task(task.task_id) + assert updated_task is not None + assert updated_task.auth_state["access_token"] == "u-new" + assert updated_task.auth_state["refresh_token"] == "r-new" + assert updated_task.auth_state["expires_at"] is not None + + @pytest.mark.asyncio + async def test_app_token_watch_does_not_refresh_feishu_user_token( + self, temp_storage: Path, request_context: RequestContext + ): + resource_processor = MockResourceProcessor() + resource_service = ResourceService( + vikingdb=MockVikingDB(), + viking_fs=MockVikingFS(root_path=str(temp_storage)), + resource_processor=resource_processor, + skill_processor=MockSkillProcessor(), + watch_scheduler=None, + ) + scheduler = WatchScheduler(resource_service=resource_service, viking_fs=None) + await scheduler.start() + scheduler._feishu_oauth_client = FakeFeishuOAuthClient() + watch_manager = scheduler.watch_manager + + task = await watch_manager.create_task( + path="https://example.feishu.cn/docx/doc_token", + to_uri="viking://resources/feishu-app-watch", + watch_interval=30.0, + ) + + await scheduler._execute_task(task) + + assert scheduler._feishu_oauth_client.calls == [] + assert resource_processor.call_count == 1 + assert "feishu_access_token" not in resource_processor.calls[-1] + + @pytest.mark.asyncio + async def test_permanent_feishu_refresh_failure_deactivates_task( + self, temp_storage: Path, request_context: RequestContext + ): + resource_processor = MockResourceProcessor() + resource_service = ResourceService( + vikingdb=MockVikingDB(), + viking_fs=MockVikingFS(root_path=str(temp_storage)), + resource_processor=resource_processor, + skill_processor=MockSkillProcessor(), + watch_scheduler=None, + ) + scheduler = WatchScheduler(resource_service=resource_service, viking_fs=None) + await scheduler.start() + scheduler._feishu_oauth_client = FakeFeishuOAuthClient( + error=FeishuTokenRefreshError("invalid refresh token", permanent=True) + ) + watch_manager = scheduler.watch_manager + + task = await watch_manager.create_task( + path="https://example.feishu.cn/docx/doc_token", + to_uri="viking://resources/feishu-invalid-token", + watch_interval=30.0, + auth_state={ + "provider": "feishu", + "access_token": "u-old", + "refresh_token": "r-old", + "expires_at": None, + }, + ) + + await scheduler._execute_task(task) + + updated_task = await watch_manager.get_task(task.task_id) + assert updated_task is not None + assert updated_task.is_active is False + assert resource_processor.call_count == 0 + class TestSchedulerIntegration: """Integration tests for WatchScheduler with WatchManager."""