feat: add support to medatata to messages sent from the agents#415
feat: add support to medatata to messages sent from the agents#415
Conversation
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
There was a problem hiding this comment.
Pull request overview
Adds a richer “new conversation” message type so agent-initiated outbound messages can carry attachment metadata (e.g., generated images/files) to connectors, and updates connectors to consume it.
Changes:
- Introduce
types.ConversationMessage(OpenAI message + metadata) and update agent subscriber APIs to use it. - Accumulate action metadata onto
job.Metadataand send it along when thenew_conversationtool is invoked. - Update connectors (Telegram/Slack/Matrix/IRC/Email/Discord) to use the new subscriber payload; Telegram additionally sends images/songs/PDFs from metadata.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| services/connectors/telegram.go | Subscriber now receives ConversationMessage; sends media (images/songs/PDFs) from metadata before the text message. |
| services/connectors/slack.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/matrix.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/irc.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/email.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/discord.go | Update subscriber to read message content from ccm.Message.Content. |
| core/types/conversation.go | Add ConversationMessage type + helpers for attaching metadata. |
| core/agent/options.go | Change option type for new-conversation subscribers to func(*types.ConversationMessage). |
| core/agent/agent_test.go | Update subscriber test to handle ConversationMessage. |
| core/agent/agent.go | Change new-conversation channel/subscribers to ConversationMessage; merge action metadata into job metadata and forward it on new_conversation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Get accumulated metadata from job (e.g., images, files generated by previous actions in this job) | ||
| // This is per-job metadata, so parallel jobs won't interfere with each other | ||
| metadata := job.Metadata | ||
|
|
||
| go func(agent *Agent) { | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content) | ||
| agent.newConversations <- msg | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content, "metadata_keys", len(metadata)) | ||
| // Send ConversationMessage with both the message and accumulated metadata | ||
| agent.newConversations <- types.NewConversationMessage(msg).WithMetadata(metadata) | ||
| // Job metadata is automatically cleared when job finishes, no need to manually clear |
There was a problem hiding this comment.
metadata := job.Metadata passes the job’s metadata map by reference into a goroutine and then to subscribers. This risks data races or accidental mutation by subscribers. Create a defensive copy of the map (and of any slice values you expect) before sending it in ConversationMessage.
| // Get accumulated metadata from job (e.g., images, files generated by previous actions in this job) | ||
| // This is per-job metadata, so parallel jobs won't interfere with each other | ||
| metadata := job.Metadata | ||
|
|
||
| go func(agent *Agent) { | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content) | ||
| agent.newConversations <- msg | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content, "metadata_keys", len(metadata)) | ||
| // Send ConversationMessage with both the message and accumulated metadata | ||
| agent.newConversations <- types.NewConversationMessage(msg).WithMetadata(metadata) | ||
| // Job metadata is automatically cleared when job finishes, no need to manually clear |
There was a problem hiding this comment.
There’s no test asserting that action metadata is propagated to WithNewConversationSubscriber / AddSubscriber consumers. Add a unit/integration test that runs a job with an action returning metadata (e.g. images_url) followed by new_conversation, and verify the subscriber receives the expected metadata map.
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) |
There was a problem hiding this comment.
In this subscriber, strconv.ParseInt(t.channelID, 10, 64) errors are ignored; if channelID is non-numeric (e.g. @channelusername) or malformed, chatID becomes 0 and media will be sent to an invalid chat. Parse the chat ID once before the loops, handle the error (log + return), and reuse the parsed value for all sends.
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle songs from generate_song action (local file paths) | ||
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | ||
| xlog.Debug("Sending song from new conversation", "path", path) |
There was a problem hiding this comment.
The metadata values are asserted as ([]string) without checking the dynamic type (e.g. imagesUrls.([]string)). If any action (or user-defined action) populates these metadata keys with a different type (like []any), this will panic. Use a safe type assertion/type switch and skip/log unexpected types.
| // Handle images from gen image actions | ||
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle songs from generate_song action (local file paths) | ||
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | ||
| xlog.Debug("Sending song from new conversation", "path", path) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendSongToTelegram(ctx, t.bot, chatID, path); err != nil { | ||
| xlog.Error("Error sending song", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle PDFs from generate_pdf action (local file paths) | ||
| if pdfPaths, exists := ccm.Metadata[actions.MetadataPDFs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(pdfPaths.([]string)) { | ||
| data, err := os.ReadFile(path) | ||
| if err != nil { | ||
| xlog.Error("Error reading PDF file", "path", path, "error", err) | ||
| continue | ||
| } | ||
|
|
||
| filename := filepath.Base(path) | ||
| if filename == "" || filename == "." { | ||
| filename = "document.pdf" | ||
| } | ||
|
|
||
| xlog.Debug("Sending PDF document from new conversation", "filename", filename, "size", len(data)) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| _, err = t.bot.SendDocument(ctx, &bot.SendDocumentParams{ | ||
| ChatID: chatID, | ||
| Document: &models.InputFileUpload{ | ||
| Filename: filename, | ||
| Data: bytes.NewReader(data), | ||
| }, | ||
| Caption: "Generated PDF", | ||
| }) | ||
| if err != nil { | ||
| xlog.Error("Error sending PDF", "error", err) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This block duplicates the multimedia handling already implemented in handleMultimediaContent (same images/songs/pdfs logic). Consider extracting a shared helper for “send metadata attachments” and reuse it here to avoid divergent behavior/bugs when one path is updated and the other isn’t.
| // Handle images from gen image actions | |
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | |
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | |
| xlog.Debug("Sending photo from new conversation", "url", url) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | |
| xlog.Error("Error handling image", "error", err) | |
| } | |
| } | |
| } | |
| // Handle songs from generate_song action (local file paths) | |
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | |
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | |
| xlog.Debug("Sending song from new conversation", "path", path) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := sendSongToTelegram(ctx, t.bot, chatID, path); err != nil { | |
| xlog.Error("Error sending song", "error", err) | |
| } | |
| } | |
| } | |
| // Handle PDFs from generate_pdf action (local file paths) | |
| if pdfPaths, exists := ccm.Metadata[actions.MetadataPDFs]; exists { | |
| for _, path := range xstrings.UniqueSlice(pdfPaths.([]string)) { | |
| data, err := os.ReadFile(path) | |
| if err != nil { | |
| xlog.Error("Error reading PDF file", "path", path, "error", err) | |
| continue | |
| } | |
| filename := filepath.Base(path) | |
| if filename == "" || filename == "." { | |
| filename = "document.pdf" | |
| } | |
| xlog.Debug("Sending PDF document from new conversation", "filename", filename, "size", len(data)) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| _, err = t.bot.SendDocument(ctx, &bot.SendDocumentParams{ | |
| ChatID: chatID, | |
| Document: &models.InputFileUpload{ | |
| Filename: filename, | |
| Data: bytes.NewReader(data), | |
| }, | |
| Caption: "Generated PDF", | |
| }) | |
| if err != nil { | |
| xlog.Error("Error sending PDF", "error", err) | |
| } | |
| } | |
| } | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := handleMultimediaContent(ctx, t.bot, chatID, ccm.Metadata); err != nil { | |
| xlog.Error("Error handling multimedia content", "error", err) | |
| } |
| data, err := os.ReadFile(path) | ||
| if err != nil { | ||
| xlog.Error("Error reading PDF file", "path", path, "error", err) | ||
| continue | ||
| } |
There was a problem hiding this comment.
Reading PDFs with os.ReadFile(path) loads the entire document into memory. Since these files can be user-generated and potentially large, add a size check (and/or stream upload if supported by the Telegram client) to avoid high memory usage or OOMs when sending PDFs from metadata.
| case msg := <-a.newConversations: | ||
| xlog.Debug("New conversation", "agent", a.Character.Name, "message", msg.Content) | ||
| xlog.Debug("New conversation", "agent", a.Character.Name, "message", msg.Message.Content) | ||
| a.subscriberMutex.Lock() |
There was a problem hiding this comment.
msg is dereferenced in the debug log (msg.Message.Content) before checking msg != nil, so a nil message sent on a.newConversations would panic. Move the nil check before any dereference/logging in this select case.
| job.Metadata = make(map[string]interface{}) | ||
| } | ||
| for key, value := range actionResult.Metadata { | ||
| job.Metadata[key] = value |
There was a problem hiding this comment.
The “merge” of actionResult.Metadata into job.Metadata overwrites existing keys, so multiple actions producing the same metadata key (e.g. multiple images_url results) will lose earlier values. If the intent is to accumulate attachments across actions, append/union slices for known keys (or implement a generic merge strategy) instead of unconditional overwrite.
| job.Metadata[key] = value | |
| // If the key does not exist yet, just set it. | |
| if existing, ok := job.Metadata[key]; ok { | |
| switch v := value.(type) { | |
| // Append []string values when both old and new are []string. | |
| case []string: | |
| if exSlice, ok := existing.([]string); ok { | |
| job.Metadata[key] = append(exSlice, v...) | |
| continue | |
| } | |
| // Append []interface{} values when both old and new are []interface{}. | |
| case []interface{}: | |
| if exSlice, ok := existing.([]interface{}); ok { | |
| job.Metadata[key] = append(exSlice, v...) | |
| continue | |
| } | |
| } | |
| // Fallback: overwrite (preserves previous behavior for non-slice types). | |
| job.Metadata[key] = value | |
| } else { | |
| job.Metadata[key] = value | |
| } |
No description provided.