-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrtc_handler.py
More file actions
381 lines (307 loc) · 11.9 KB
/
rtc_handler.py
File metadata and controls
381 lines (307 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
import asyncio
from video_handler import VideoHandler, AvatarState
from ai_handler import AIHandler
import json
from typing import Set
import logging
import os
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Mount static files directory for serving HTML, JS, CSS files
# RecroTech widget demo için static dosyaları serve et
app.mount("/static", StaticFiles(directory="static"), name="static")
# Mount avatar_videos directory for serving avatar video files
# Avatar videoları /avatar_videos path'i üzerinden servis edilir
app.mount("/avatar_videos", StaticFiles(directory="avatar_videos"), name="avatar_videos")
class RTCHandler:
def __init__(self, video_dir: str):
"""
Initialize RTC handler for managing avatar video streams.
Args:
video_dir: Directory containing avatar videos
"""
self.video_handler = VideoHandler(video_dir)
self.active_connections: Set[WebSocket] = set()
async def connect(self, websocket: WebSocket):
"""Accept and register a new WebSocket connection."""
await websocket.accept()
self.active_connections.add(websocket)
logger.info(f"Client connected. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""Remove a WebSocket connection."""
self.active_connections.discard(websocket)
logger.info(f"Client disconnected. Total connections: {len(self.active_connections)}")
async def broadcast_frame(self, frame_bytes: bytes):
"""Broadcast a video frame to all connected clients."""
disconnected = set()
for connection in self.active_connections:
try:
await connection.send_bytes(frame_bytes)
except Exception as e:
logger.error(f"Error sending frame: {e}")
disconnected.add(connection)
# Remove disconnected clients
for conn in disconnected:
self.disconnect(conn)
async def handle_message(self, websocket: WebSocket, message: str):
"""
Handle incoming WebSocket messages to control avatar state.
Expected message format: {"action": "standing|listening|thinking|talking"}
"""
try:
data = json.loads(message)
action = data.get("action", "").lower()
if action == "standing":
self.video_handler.play_standing()
elif action == "listening":
self.video_handler.play_listening()
elif action == "thinking":
self.video_handler.play_thinking()
elif action == "talking":
self.video_handler.play_talking()
else:
await websocket.send_json({
"error": f"Unknown action: {action}",
"valid_actions": ["standing", "listening", "thinking", "talking"]
})
return
# Confirm state change
await websocket.send_json({
"status": "success",
"current_state": self.video_handler.get_current_state().value
})
logger.info(f"Avatar state changed to: {action}")
except json.JSONDecodeError:
await websocket.send_json({"error": "Invalid JSON message"})
except Exception as e:
logger.error(f"Error handling message: {e}")
await websocket.send_json({"error": str(e)})
# Initialize handlers
rtc_handler = RTCHandler(video_dir="./avatar_videos")
system_prompt = """
You are a helpful AI assistant. Respond to user queries in max 2 sentences.
Be friendly, concise, and natural in your responses.
"""
ai_handler = AIHandler(system_prompt, rtc_handler=rtc_handler)
@app.get("/")
async def root():
"""Health check endpoint."""
return {
"status": "running",
"current_state": rtc_handler.video_handler.get_current_state().value,
"is_speaking": ai_handler.is_speaking
}
@app.websocket("/ws/avatar")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for avatar video streaming.
Clients can send JSON messages to control avatar state:
{"action": "standing"} - Switch to standing video
{"action": "listening"} - Switch to listening video
{"action": "thinking"} - Switch to thinking video
{"action": "talking"} - Switch to talking video
{"action": "writing"} - Switch to writing video
"""
await rtc_handler.connect(websocket)
# Start streaming task
stream_task = asyncio.create_task(
rtc_handler.video_handler.stream_frames(rtc_handler.broadcast_frame)
)
try:
while True:
# Receive messages from client
message = await websocket.receive_text()
await rtc_handler.handle_message(websocket, message)
except WebSocketDisconnect:
rtc_handler.disconnect(websocket)
logger.info("Client disconnected normally")
except Exception as e:
logger.error(f"WebSocket error: {e}")
rtc_handler.disconnect(websocket)
finally:
stream_task.cancel()
try:
await stream_task
except asyncio.CancelledError:
pass
@app.get("/stream/avatar")
async def http_stream():
"""
HTTP endpoint for MJPEG video streaming.
Can be used directly in HTML <img> tags.
"""
async def generate():
while True:
frame = rtc_handler.video_handler.get_frame()
if frame:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
await asyncio.sleep(0.033) # ~30 FPS
return StreamingResponse(
generate(),
media_type="multipart/x-mixed-replace; boundary=frame"
)
@app.post("/avatar/state/{state}")
async def set_avatar_state(state: str):
"""
REST endpoint to change avatar state.
Args:
state: One of "standing", "listening", "thinking", "talking"
"""
state = state.lower()
if state == "standing":
rtc_handler.video_handler.play_standing()
elif state == "listening":
rtc_handler.video_handler.play_listening()
elif state == "thinking":
rtc_handler.video_handler.play_thinking()
elif state == "talking":
rtc_handler.video_handler.play_talking()
elif state == "writing":
rtc_handler.video_handler.play_writing()
else:
return {
"error": f"Invalid state: {state}",
"valid_states": ["standing", "listening", "thinking", "talking", "writing"]
}, 400
return {
"status": "success",
"current_state": rtc_handler.video_handler.get_current_state().value
}
@app.post("/ai/speak")
async def speak_text(text: str):
"""
Make the AI avatar speak the given text.
Avatar will switch to talking state and play audio.
"""
asyncio.create_task(ai_handler.speak_with_avatar(text))
return {
"status": "speaking",
"text": text
}
# Replace your /ai/chat endpoint with this debugged version:
# Replace your /ai/chat endpoint with this fixed version:
@app.post("/ai/chat")
async def chat(message: str):
"""
Send a text message to AI and get TEXT response only.
Avatar will animate: LISTENING -> THINKING -> TALKING -> STANDING
"""
print(f"\n{'=' * 60}")
print(f"💬 CHAT ENDPOINT CALLED")
print(f"Message: {message}")
print(f"{'=' * 60}")
# 1. Set to listening
print("🎭 Setting avatar to LISTENING state...")
rtc_handler.video_handler.play_listening()
await asyncio.sleep(0.5) # Brief pause
# 2. Set to thinking
print("🎭 Setting avatar to THINKING state...")
rtc_handler.video_handler.play_thinking()
print(f"🎭 Current state: {rtc_handler.video_handler.get_current_state().value}")
# Get AI response
print("🤖 Getting AI response...")
response = await ai_handler.send_message_get_answer(message)
if response:
# 3. Set to talking
print("🎭 Setting avatar to TALKING state...")
rtc_handler.video_handler.play_talking()
# Estimate talking duration based on word count
word_count = len(response.split())
# Assume ~2.5 words per second speaking rate
talking_duration = max(2.0, word_count / 2.5) # Minimum 2 seconds
print(f"🗣️ Talking for ~{talking_duration:.1f} seconds ({word_count} words)")
# Schedule return to standing after talking
async def return_to_standing():
await asyncio.sleep(talking_duration)
rtc_handler.video_handler.play_standing()
print("🎭 Returned to STANDING state")
asyncio.create_task(return_to_standing())
return {
"status": "success",
"user_message": message,
"ai_response": response,
"talking_duration": talking_duration
}
else:
print("❌ Failed to get AI response")
rtc_handler.video_handler.play_standing()
return {
"status": "error",
"message": "Failed to get AI response"
}
@app.post("/ai/voice")
async def process_voice(audio: UploadFile = File(...)):
"""
Upload voice audio and get spoken AI response.
Complete workflow: STT -> AI -> TTS with avatar animations.
Returns user transcript and AI response for chat display.
"""
try:
print(f"\n{'=' * 60}")
print(f"🎤 VOICE ENDPOINT CALLED")
print(f"{'=' * 60}")
# Save uploaded audio temporarily
temp_audio = f"temp_input_{audio.filename}"
with open(temp_audio, "wb") as f:
f.write(await audio.read())
# Set to listening
print("🎭 Setting avatar to LISTENING state...")
rtc_handler.video_handler.play_listening()
# Transcribe user speech (STT)
print("🎤 Transcribing audio...")
user_text = await ai_handler.transcribe_audio(temp_audio)
if not user_text:
rtc_handler.video_handler.play_standing()
return {
"status": "error",
"message": "Ses metne dönüştürülemedi"
}
logger.info(f"🎤 Transcription: {user_text}")
# Set to thinking
print("🎭 Setting avatar to THINKING state...")
rtc_handler.video_handler.play_thinking()
print(f"🎭 Current state: {rtc_handler.video_handler.get_current_state().value}")
# Get AI response
print("🤖 Getting AI response...")
ai_response = await ai_handler.send_message_get_answer(user_text)
if not ai_response:
rtc_handler.video_handler.play_standing()
return {
"status": "error",
"message": "AI yanıt üretemedi",
"stt_text": user_text
}
logger.info(f"🤖 AI Response: {ai_response}")
# Generate audio for frontend
print("🔊 Generating TTS audio...")
audio_url = await ai_handler.generate_audio_for_frontend(ai_response)
# Frontend handles audio playback and avatar state - no server-side playback
print("✅ Voice processing complete")
# Clean up temp file
try:
os.remove(temp_audio)
except:
pass
return {
"status": "success",
"stt_text": user_text,
"ai_response": ai_response,
"audio_url": audio_url
}
except Exception as e:
logger.error(f"Voice processing error: {e}")
import traceback
traceback.print_exc()
rtc_handler.video_handler.play_standing()
return {
"status": "error",
"message": str(e)
}
if __name__ == "__main__":
import uvicorn