Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ serde_json = "1.0"
toml = "0.8"
rmp-serde = "1"

deadpool = "0.10"
cached = { version = "0.56.0", features = ["async"] }

anyhow = "1.0"
Expand Down
99 changes: 35 additions & 64 deletions src/services/ws/stable/tts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,84 +122,55 @@ impl TTSSession {
}
}

pub struct TTSSessionPool {
pub config: crate::config::TTSConfig,
pub workers: usize,
pub pool: tokio::sync::mpsc::UnboundedReceiver<tokio::sync::oneshot::Sender<TTSRequest>>,
pub tx: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<TTSRequest>>,
pub struct TTSManager {
config: crate::config::TTSConfig,
}

impl TTSSessionPool {
pub fn new(config: crate::config::TTSConfig, workers: usize) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
TTSSessionPool {
config,
workers,
pool: rx,
tx,
}
}
impl deadpool::managed::Manager for TTSManager {
type Type = TTSSession;
type Error = anyhow::Error;

pub async fn create_session(&self) -> anyhow::Result<TTSSession> {
async fn create(&self) -> Result<TTSSession, anyhow::Error> {
TTSSession::new_from_config(&self.config).await
}

pub async fn run_session(
id: u128,
mut session: TTSSession,
tx: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<TTSRequest>>,
) -> anyhow::Result<()> {
log::info!("{} starting TTS session worker", id);
loop {
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
tx.send(resp_tx)
.map_err(|e| anyhow::anyhow!("send session request error: {}", e))?;

let (text, tts_resp_tx) = resp_rx
.await
.map_err(|e| anyhow::anyhow!("receive session request error: {}", e))?;

log::info!("{} processing TTS request: {}", id, text);

if let Err(e) = session.synthesize(&text, &tts_resp_tx).await {
log::error!("{} TTS synthesis error: {}", id, e);
}
}
async fn recycle(
&self,
_obj: &mut TTSSession,
_metrics: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<anyhow::Error> {
Ok(())
}
}

pub struct TTSSessionPool {
pool: deadpool::managed::Pool<TTSManager>,
}

async fn get_req_tx(&mut self) -> anyhow::Result<tokio::sync::oneshot::Sender<TTSRequest>> {
let req_tx = self
.pool
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("no available tts session"))?;
Ok(req_tx)
impl TTSSessionPool {
pub fn new(config: crate::config::TTSConfig, workers: usize) -> Self {
let manager = TTSManager { config };
let pool = deadpool::managed::Pool::builder(manager)
.max_size(workers)
.build()
.expect("Failed to create TTS session pool");
TTSSessionPool { pool }
}

pub async fn run_loop(&mut self, mut rx: TTSRequestRx) -> anyhow::Result<()> {
let mut sucess_workers = 0;
for i in 0..self.workers {
match self.create_session().await {
Ok(session) => {
tokio::spawn(Self::run_session(i as u128, session, self.tx.clone()));
sucess_workers += 1;
while let Some((text, tts_resp_tx)) = rx.recv().await {
match self.pool.get().await {
Ok(mut session) => {
tokio::spawn(async move {
log::info!("Processing TTS request: {}", text);
if let Err(e) = session.synthesize(&text, &tts_resp_tx).await {
log::error!("TTS synthesis error: {}", e);
}
});
}
Err(e) => {
log::error!("create tts session[{i}] error: {}", e);
continue;
log::error!("Failed to get TTS session from pool: {}", e);
}
};
}

if sucess_workers == 0 {
return Err(anyhow::anyhow!("no available tts session worker"));
}

while let Some(tts_req) = rx.recv().await {
let req_tx = self.get_req_tx().await?;

if let Err(e) = req_tx.send(tts_req) {
log::error!("send tts request to session error: {}", e.0);
}
}
Ok(())
Expand Down