From f2ac86c1117b620595b0770577c4eb8654ff02cd Mon Sep 17 00:00:00 2001 From: Gris Ge Date: Tue, 3 Feb 2026 15:01:59 +0800 Subject: [PATCH] Change TryStream to Stream * Align with `rtnetlink` crate. * `Stream` allow us to do map, join, select actions while TryStream cannot. Signed-off-by: Gris Ge --- .gitignore | 2 +- examples/dump_channels.rs | 7 ++++--- examples/dump_coalesce.rs | 9 +++++---- examples/dump_eeprom_page.rs | 7 ++++--- examples/dump_features.rs | 7 ++++--- examples/dump_fec.rs | 6 +++--- examples/dump_link_mode.rs | 6 +++--- examples/dump_pause.rs | 7 ++++--- examples/dump_rings.rs | 7 ++++--- examples/dump_tsinfo.rs | 7 ++++--- src/channel/get.rs | 8 +++++--- src/coalesce/get.rs | 8 +++++--- src/eeprom/get.rs | 8 +++++--- src/feature/get.rs | 8 +++++--- src/fec/get.rs | 8 +++++--- src/handle.rs | 23 +++++++++-------------- src/link_mode/get.rs | 8 +++++--- src/pause/get.rs | 8 +++++--- src/ring/get.rs | 8 +++++--- src/tsinfo/get.rs | 8 +++++--- tests/dump_link_modes.rs | 7 ++++--- tests/get_features_lo.rs | 7 ++++--- 22 files changed, 98 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index bb55a70..a0bdd6c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ Cargo.lock target vendor/ - +tags *.swp diff --git a/examples/dump_channels.rs b/examples/dump_channels.rs index 605dc19..cb59924 100644 --- a/examples/dump_channels.rs +++ b/examples/dump_channels.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -16,10 +16,11 @@ async fn get_channels(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut channel_handle = handle.channel().get(iface_name).execute().await; + let mut channel_handle = + handle.channel().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = channel_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = channel_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_coalesce.rs b/examples/dump_coalesce.rs index 22387ff..ee2c9fc 100644 --- a/examples/dump_coalesce.rs +++ b/examples/dump_coalesce.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -16,11 +16,12 @@ async fn get_coalesce(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut coalesce_handle = handle.coalesce().get(iface_name).execute().await; + let mut coalesce_handle = + handle.coalesce().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = coalesce_handle.try_next().await.unwrap() { - msgs.push(msg); + while let Some(Ok(msg)) = coalesce_handle.next().await { + msgs.push(msg.payload); } assert!(!msgs.is_empty()); for msg in msgs { diff --git a/examples/dump_eeprom_page.rs b/examples/dump_eeprom_page.rs index 3c6389b..1753573 100644 --- a/examples/dump_eeprom_page.rs +++ b/examples/dump_eeprom_page.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; fn main() { let rt = tokio::runtime::Builder::new_current_thread() @@ -19,10 +19,11 @@ async fn get_eeprom(iface_name: Option<&str>) { .eeprom() .get(iface_name, 0, 1, 0, 0, 0x50) .execute() - .await; + .await + .unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = eeprom_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = eeprom_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_features.rs b/examples/dump_features.rs index 0540f6a..ed4022f 100644 --- a/examples/dump_features.rs +++ b/examples/dump_features.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -16,10 +16,11 @@ async fn get_feature(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut feature_handle = handle.feature().get(iface_name).execute().await; + let mut feature_handle = + handle.feature().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = feature_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = feature_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_fec.rs b/examples/dump_fec.rs index 18de147..e3bde18 100644 --- a/examples/dump_fec.rs +++ b/examples/dump_fec.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -17,10 +17,10 @@ async fn get_fec(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut fec_handle = handle.fec().get(iface_name).execute().await; + let mut fec_handle = handle.fec().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = fec_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = fec_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_link_mode.rs b/examples/dump_link_mode.rs index f9e6bec..3b1d45d 100644 --- a/examples/dump_link_mode.rs +++ b/examples/dump_link_mode.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -17,10 +17,10 @@ async fn get_link_mode(iface_name: Option<&str>) { tokio::spawn(connection); let mut link_mode_handle = - handle.link_mode().get(iface_name).execute().await; + handle.link_mode().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = link_mode_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = link_mode_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_pause.rs b/examples/dump_pause.rs index 4f75b9e..7834e61 100644 --- a/examples/dump_pause.rs +++ b/examples/dump_pause.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -17,10 +17,11 @@ async fn get_pause(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut pause_handle = handle.pause().get(iface_name).execute().await; + let mut pause_handle = + handle.pause().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = pause_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = pause_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_rings.rs b/examples/dump_rings.rs index 75c405b..643d4a9 100644 --- a/examples/dump_rings.rs +++ b/examples/dump_rings.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -16,10 +16,11 @@ async fn get_ring(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut ring_handle = handle.ring().get(iface_name).execute().await; + let mut ring_handle = + handle.ring().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = ring_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = ring_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/examples/dump_tsinfo.rs b/examples/dump_tsinfo.rs index 2f4c628..0d1621e 100644 --- a/examples/dump_tsinfo.rs +++ b/examples/dump_tsinfo.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; // Once we find a way to load netsimdev kernel module in CI, we can convert this // to a test @@ -17,10 +17,11 @@ async fn get_tsinfo(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut tsinfo_handle = handle.tsinfo().get(iface_name).execute().await; + let mut tsinfo_handle = + handle.tsinfo().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = tsinfo_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = tsinfo_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/src/channel/get.rs b/src/channel/get.rs index 5a28324..c17c781 100644 --- a/src/channel/get.rs +++ b/src/channel/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolChannelGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolChannelGetRequest { mut handle, iface_name, diff --git a/src/coalesce/get.rs b/src/coalesce/get.rs index baf0faa..36c8723 100644 --- a/src/coalesce/get.rs +++ b/src/coalesce/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolCoalesceGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolCoalesceGetRequest { mut handle, iface_name, diff --git a/src/eeprom/get.rs b/src/eeprom/get.rs index 677b7cd..a618115 100644 --- a/src/eeprom/get.rs +++ b/src/eeprom/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -38,8 +38,10 @@ impl EthtoolModuleEEPROMGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolModuleEEPROMGetRequest { mut handle, iface_name, diff --git a/src/feature/get.rs b/src/feature/get.rs index 89b07a7..19ae293 100644 --- a/src/feature/get.rs +++ b/src/feature/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolFeatureGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolFeatureGetRequest { mut handle, iface_name, diff --git a/src/fec/get.rs b/src/fec/get.rs index 7ec440f..1122ae3 100644 --- a/src/fec/get.rs +++ b/src/fec/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolFecGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolFecGetRequest { mut handle, iface_name, diff --git a/src/handle.rs b/src/handle.rs index feba8d4..95267f9 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::{future::Either, FutureExt, Stream, StreamExt, TryStream}; +use futures_util::{Stream, StreamExt}; use genetlink::GenetlinkHandle; use netlink_packet_core::DecodeError; use netlink_packet_core::{ @@ -83,7 +83,10 @@ pub(crate) async fn ethtool_execute( handle: &mut EthtoolHandle, is_dump: bool, ethtool_msg: EthtoolMessage, -) -> impl TryStream, Error = EthtoolError> { +) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, +> { let nl_header_flags = if is_dump { // The NLM_F_ACK is required due to bug of kernel: // https://bugzilla.redhat.com/show_bug.cgi?id=1953847 @@ -101,16 +104,8 @@ pub(crate) async fn ethtool_execute( nl_msg.header.flags = nl_header_flags; - match handle.request(nl_msg).await { - Ok(response) => { - Either::Left(response.map(move |msg| Ok(try_ethtool!(msg)))) - } - Err(e) => Either::Right( - futures_util::future::err::< - GenlMessage, - EthtoolError, - >(e) - .into_stream(), - ), - } + Ok(handle + .request(nl_msg) + .await? + .map(move |msg| Ok(try_ethtool!(msg)))) } diff --git a/src/link_mode/get.rs b/src/link_mode/get.rs index 256a53f..7c64bea 100644 --- a/src/link_mode/get.rs +++ b/src/link_mode/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolLinkModeGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolLinkModeGetRequest { mut handle, iface_name, diff --git a/src/pause/get.rs b/src/pause/get.rs index 5f7ae94..66e9b47 100644 --- a/src/pause/get.rs +++ b/src/pause/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolPauseGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolPauseGetRequest { mut handle, iface_name, diff --git a/src/ring/get.rs b/src/ring/get.rs index fba6965..0719c51 100644 --- a/src/ring/get.rs +++ b/src/ring/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolRingGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolRingGetRequest { mut handle, iface_name, diff --git a/src/tsinfo/get.rs b/src/tsinfo/get.rs index f65de30..5a7779c 100644 --- a/src/tsinfo/get.rs +++ b/src/tsinfo/get.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::TryStream; +use futures_util::Stream; use netlink_packet_generic::GenlMessage; use crate::{ethtool_execute, EthtoolError, EthtoolHandle, EthtoolMessage}; @@ -20,8 +20,10 @@ impl EthtoolTsInfoGetRequest { pub async fn execute( self, - ) -> impl TryStream, Error = EthtoolError> - { + ) -> Result< + impl Stream, EthtoolError>>, + EthtoolError, + > { let EthtoolTsInfoGetRequest { mut handle, iface_name, diff --git a/tests/dump_link_modes.rs b/tests/dump_link_modes.rs index 0f631a8..4d9b34f 100644 --- a/tests/dump_link_modes.rs +++ b/tests/dump_link_modes.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; #[test] // CI container normally have a veth for external communication which support @@ -17,10 +17,11 @@ async fn dump_link_modes() { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut link_modes_handle = handle.link_mode().get(None).execute().await; + let mut link_modes_handle = + handle.link_mode().get(None).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = link_modes_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = link_modes_handle.next().await { msgs.push(msg); } assert!(!msgs.is_empty()); diff --git a/tests/get_features_lo.rs b/tests/get_features_lo.rs index 1cd3903..08dd2e2 100644 --- a/tests/get_features_lo.rs +++ b/tests/get_features_lo.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT -use futures_util::stream::TryStreamExt; +use futures_util::stream::StreamExt; #[test] fn test_get_features_of_loopback() { @@ -15,10 +15,11 @@ async fn get_feature(iface_name: Option<&str>) { let (connection, mut handle, _) = ethtool::new_connection().unwrap(); tokio::spawn(connection); - let mut feature_handle = handle.feature().get(iface_name).execute().await; + let mut feature_handle = + handle.feature().get(iface_name).execute().await.unwrap(); let mut msgs = Vec::new(); - while let Some(msg) = feature_handle.try_next().await.unwrap() { + while let Some(Ok(msg)) = feature_handle.next().await { msgs.push(msg); } assert!(msgs.len() == 1);