fix(core): cleanup video streams properly

This commit is contained in:
uncor3
2026-04-07 18:19:02 +00:00
parent 3c5fe6787b
commit c308d4926f
9 changed files with 153 additions and 56 deletions
+2 -1
View File
@@ -60,7 +60,8 @@ MediaPreviewDialog::~MediaPreviewDialog()
{
// Release the streamer if it was used for video
if (m_isVideo) {
MediaStreamerManager::sharedInstance()->releaseStreamer(m_filePath);
MediaStreamerManager::sharedInstance()->releaseStreamer(m_device->udid,
m_filePath);
}
}
+3 -2
View File
@@ -69,7 +69,8 @@ QUrl MediaStreamerManager::getStreamUrl(
return QUrl(rustUrl);
}
void MediaStreamerManager::releaseStreamer(const QString &filePath)
void MediaStreamerManager::releaseStreamer(const QString &udid,
const QString &filePath)
{
QMutexLocker locker(&m_streamersMutex);
auto it = m_streamers.find(filePath);
@@ -83,7 +84,7 @@ void MediaStreamerManager::releaseStreamer(const QString &filePath)
<< filePath;
// delete it->streamer;
AppContext::sharedInstance()->ioManager->release_video_streamer(
it.value().rustUrl);
udid, it.value().rustUrl);
m_streamers.erase(it);
}
}
+1 -1
View File
@@ -37,7 +37,7 @@ public:
std::optional<std::shared_ptr<CXX::HauseArrest>> hause_arrest,
bool useAfc2, const QString &filePath);
void releaseStreamer(const QString &filePath);
void releaseStreamer(const QString &udid, const QString &filePath);
void cleanup();
+21 -4
View File
@@ -1,7 +1,7 @@
use cxx_qt::Threading;
use cxx_qt_lib::{QByteArray, QList, QMap, QMapPair_QString_QVariant, QString};
use crate::{APP_DEVICE_STATE, RUNTIME, VIDEO_STREAMS, afc, run_sync};
use crate::{APP_DEVICE_STATE, RUNTIME, afc, run_sync};
use idevice::afc::{AfcClient, opcode::AfcFopenMode};
use once_cell::sync::Lazy;
use regex::Regex;
@@ -700,10 +700,27 @@ impl qobject::Afc2Backend {
let url = format!("http://127.0.0.1:{}/{}", port, encoded);
let url_clone = url.clone();
let url_clone_for_log = url.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
{
let mut map = VIDEO_STREAMS.lock().unwrap();
map.insert(url.clone(), shutdown_tx);
let udid_for_insert = udid_str.clone();
let url_for_insert = url.clone();
let inserted = run_sync(async move {
let maybe_device = APP_DEVICE_STATE.lock().await.get(&udid_for_insert).cloned();
let device = match maybe_device {
Some(d) => d,
None => return false,
};
let mut video_streams = device.video_streams.lock().await;
video_streams.insert(url_for_insert, shutdown_tx);
true
});
if !inserted {
eprintln!(
"start_video_stream: failed to insert video stream for udid={} path={}",
udid_str, cloned_path
);
return QString::default();
}
eprintln!(
"start_video_stream: serving {} for udid={} path={}",
+21 -4
View File
@@ -3,7 +3,7 @@ use cxx_qt_lib::{
QByteArray, QDateTime, QList, QMap, QMapPair_QString_QVariant, QString, QTimeZone, QVariant,
};
use crate::{APP_DEVICE_STATE, RUNTIME, VIDEO_STREAMS, afc, run_sync};
use crate::{APP_DEVICE_STATE, RUNTIME, afc, run_sync};
use idevice::{
IdeviceService,
afc::{AfcClient, opcode::AfcFopenMode},
@@ -628,10 +628,27 @@ impl qobject::AfcBackend {
let url_clone = url.clone();
let url_clone_for_log = url.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
{
let mut map = VIDEO_STREAMS.lock().unwrap();
map.insert(url.clone(), shutdown_tx);
let udid_for_insert = udid_str.clone();
let url_for_insert = url.clone();
let inserted = run_sync(async move {
let maybe_device = APP_DEVICE_STATE.lock().await.get(&udid_for_insert).cloned();
let device = match maybe_device {
Some(d) => d,
None => return false,
};
let mut video_streams = device.video_streams.lock().await;
video_streams.insert(url_for_insert, shutdown_tx);
true
});
if !inserted {
eprintln!(
"start_video_stream: failed to insert video stream for udid={} path={}",
udid_str, cloned_path
);
return QString::default();
}
eprintln!(
"start_video_stream: serving {} for udid={} path={}",
url_clone, udid_str, cloned_path
+17 -4
View File
@@ -1,4 +1,4 @@
use crate::{APP_DEVICE_STATE, RUNTIME, VIDEO_STREAMS, afc, run_sync, utils};
use crate::{APP_DEVICE_STATE, RUNTIME, afc, run_sync, utils};
use cxx_qt::{CxxQtType, Threading};
use cxx_qt_lib::{QByteArray, QMap, QMapPair_QString_QVariant, QString};
use idevice::afc::{AfcClient, opcode::AfcFopenMode};
@@ -292,9 +292,22 @@ impl qobject::HauseArrest {
let url_clone = url.clone();
let url_clone_for_log = url.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
{
let mut map = VIDEO_STREAMS.lock().unwrap();
map.insert(url.clone(), shutdown_tx);
let udid_for_insert = udid_str.clone();
let url_for_insert = url.clone();
let inserted = run_sync(async move {
let maybe_device = APP_DEVICE_STATE.lock().await.get(&udid_for_insert).cloned();
let device = match maybe_device {
Some(d) => d,
None => return false,
};
let mut video_streams = device.video_streams.lock().await;
video_streams.insert(url_for_insert, shutdown_tx);
true
});
if !inserted {
eprintln!("start_video_stream: failed to insert video stream for udid={} path={}", udid_str, cloned_path);
return QString::default();
}
eprintln!(
"start_video_stream: serving {} for udid={} path={}",
+17 -11
View File
@@ -1,4 +1,4 @@
use crate::{APP_DEVICE_STATE, RUNTIME, VIDEO_STREAMS, utils};
use crate::{APP_DEVICE_STATE, RUNTIME, run_sync, utils};
use cxx_qt::{CxxQtType, Threading};
use cxx_qt_lib::QUuid;
use idevice::{IdeviceService, afc::AfcClient, services::afc::opcode::AfcFopenMode};
@@ -144,7 +144,7 @@ mod qobject {
);
#[qinvokable]
fn release_video_streamer(self: &IOManager, url: &QString);
fn release_video_streamer(self: &IOManager, udid: &QString, url: &QString);
}
impl cxx_qt::Threading for IOManager {}
@@ -695,20 +695,26 @@ impl qobject::IOManager {
}
}
fn release_video_streamer(&self, url: &qobject::QString) {
let s = url.to_string();
fn release_video_streamer(&self, udid: &qobject::QString, url: &qobject::QString) {
let udid_str = udid.to_string();
let url_str = url.to_string();
let url_str_clone = url_str.clone();
let tx_opt = run_sync(async move {
let mut state = APP_DEVICE_STATE.lock().await;
let Some(device) = state.get_mut(&udid_str) else {
eprintln!("release_streamer: device {udid_str} not found");
return None;
};
let tx_opt = {
let mut map = VIDEO_STREAMS.lock().unwrap();
map.remove(&s)
};
let mut streams = device.video_streams.lock().await;
streams.remove(&url_str)
});
if let Some(tx) = tx_opt {
eprintln!("release_streamer: sending shutdown for URL: {s}");
// ignore if receiver is already gone
eprintln!("release_streamer: sending shutdown for URL: {url_str_clone}");
let _ = tx.send(());
} else {
eprintln!("release_streamer: no streamer found for URL: {s}");
eprintln!("release_streamer: no streamer found for URL: {url_str_clone}");
}
}
}
+13 -28
View File
@@ -10,7 +10,6 @@ use idevice::{
IdeviceError,
usbmuxd::{Connection, UsbmuxdAddr, UsbmuxdConnection, UsbmuxdListenEvent},
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{any::type_name, sync::Arc};
use std::{collections::HashMap, net::IpAddr};
use tokio::sync::Mutex;
@@ -51,7 +50,7 @@ pub struct DeviceServices {
pub afc2: Option<Arc<Mutex<AfcClient>>>,
pub diag: Arc<Mutex<DiagnosticsRelayClient>>,
pub heartbeat_task: Option<Arc<JoinHandle<()>>>,
pub video_streams: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
pub video_streams: Arc<Mutex<HashMap<String, oneshot::Sender<()>>>>,
pub provider: Arc<Mutex<Box<dyn idevice::provider::IdeviceProvider>>>,
pub lockdown: Arc<Mutex<LockdownClient>>,
}
@@ -67,9 +66,6 @@ static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.unwrap()
});
static VIDEO_STREAMS: Lazy<std::sync::Mutex<HashMap<String, oneshot::Sender<()>>>> =
Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
pub fn run_sync<F, R>(fut: F) -> R
where
F: Future<Output = R> + Send + 'static,
@@ -463,7 +459,7 @@ impl qobject::Core {
let result = tokio::select! {
res = init_idescriptor_device(t, qt_t.clone()) => res,
// timeout
_ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(20)) => {
eprintln!("Timeout collecting device info for wireless device mac address: {mac_address_owned}");
None
}
@@ -574,9 +570,13 @@ impl qobject::Core {
async fn clean_device_from_app_state(udid: &str) {
let mut state = APP_DEVICE_STATE.lock().await;
if let Some(svc) = state.remove(udid) {
let streams = svc.video_streams.lock().await;
for (_path, flag) in streams.iter() {
flag.store(true, Ordering::Relaxed);
if let Some(t) = &svc.heartbeat_task {
t.abort();
}
let mut streams = svc.video_streams.lock().await;
for (_url, tx) in streams.drain() {
let _ = tx.send(());
}
println!("Removed device with UDID {}", udid);
} else {
@@ -605,14 +605,14 @@ async fn init_idescriptor_device<
let mut lc = match LockdownClient::connect(&provider).await {
Ok(lc) => lc,
Err(e) => {
eprintln!("wireless: LockdownClient::connect failed : {e:?}");
eprintln!("LockdownClient::connect failed : {e:?}");
return None;
}
};
eprintln!("init_idescriptor_device: Attempting to start Lockdown session.");
if let Err(e) = lc.start_session(&pf).await {
eprintln!("wireless: start_session failed: {e:?}");
eprintln!("start_session failed: {e:?}");
return None;
}
eprintln!("init_idescriptor_device: Lockdown session started.");
@@ -621,7 +621,7 @@ async fn init_idescriptor_device<
let mut def_vals = match lc.get_value(None, None).await {
Ok(v) => v,
Err(e) => {
eprintln!("wireless: get_value(None, None) failed : {e:?}");
eprintln!("get_value(None, None) failed : {e:?}");
return None;
}
};
@@ -651,24 +651,10 @@ async fn init_idescriptor_device<
eprintln!("init_idescriptor_device: Connected to HeartbeatClient.");
}
// FIXME: this cannot be done when paired wirelessly
// lockdownd_set_value(device->lockdown, "EnableWifiConnections",
// // value, "com.apple.mobile.wireless_lockdown");
// let value = Value::Boolean(true);
// match lc.set_value("EnableWifiConnections", value, Some("com.apple.mobile.wireless_lockdown")).await {
// Ok(_) => {},
// Err(e) => {
// eprintln!("wireless: LockdownClient::set_value failed: {e:?}");
// return None;
// // continue anyway, as this might not be critical
// }
// }
let disk_vals = match lc.get_value(None, Some("com.apple.disk_usage")).await {
Ok(v) => v,
Err(e) => {
eprintln!("wireless: get_value(com.apple.disk_usage) failed: {e:?}");
eprintln!("get_value(com.apple.disk_usage) failed: {e:?}");
return None;
}
};
@@ -692,7 +678,6 @@ async fn init_idescriptor_device<
}
};
eprintln!("init_idescriptor_device: Connected to DiagnosticsRelayClient.");
// afc_client.set_timeout(Some(5000)).await;
let afc2 = match AfcClient::new_afc2(&provider).await {
Ok(c) => Some(Arc::new(Mutex::new(c))),
+58 -1
View File
@@ -130,6 +130,13 @@ mod qobject {
#[qsignal]
fn install_ipa_progress(self: Pin<&mut ServiceManager>, progress: f64, state: QString);
#[qinvokable]
fn enable_wifi_connections(&self);
#[qsignal]
fn enable_wifi_connections_result(self: Pin<&mut ServiceManager>, success: bool);
}
impl cxx_qt::Threading for ServiceManager {}
@@ -793,7 +800,7 @@ impl qobject::ServiceManager {
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
eprintln!("set_location: timed out");
-71
idevice::IdeviceError::Timeout.code()
}
}
})
@@ -1133,6 +1140,56 @@ impl qobject::ServiceManager {
}
});
}
fn enable_wifi_connections(&self) {
let qt_t = self.qt_thread();
let udid = self.get_udid().to_string();
RUNTIME.spawn(async move {
let qt_thread = qt_t.clone();
let lc_arc = {
let maybe_device = APP_DEVICE_STATE
.lock()
.await
.get(udid.as_str())
.cloned();
let device = match maybe_device {
Some(d) => d,
None => {
eprintln!("enable_wifi_connections: device {udid} not found");
let _ = qt_thread.queue(|t| {
t.enable_wifi_connections_result(false);
}).ok();
return;
}
};
device.lockdown.clone()
};
let mut lc = lc_arc.lock().await;
let value = Value::Boolean(true);
match lc.set_value("EnableWifiConnections", value, Some("com.apple.mobile.wireless_lockdown")).await {
Ok(_) => {
let _ = qt_thread.queue(|t| {
t.enable_wifi_connections_result(true);
}).ok();
},
Err(e) => {
eprintln!("wireless: LockdownClient::set_value failed: {e:?}");
let _ = qt_thread.queue(|t| {
t.enable_wifi_connections_result(false);
}).ok();
}
}
});
}
}
async fn set_device_location_lockdown(