Skip to content

Commit

Permalink
[RSDK-9643] ota download timeout (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjperez authored Jan 7, 2025
1 parent 3baca97 commit 249f2eb
Showing 1 changed file with 100 additions and 74 deletions.
174 changes: 100 additions & 74 deletions micro-rdk/src/common/ota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use crate::esp32::esp_idf_svc::{
sys::{esp_ota_get_next_update_partition, esp_partition_t},
};
use async_io::Timer;
use futures_lite::{FutureExt, StreamExt};
use futures_util::TryFutureExt;
use http_body_util::{BodyExt, Empty};
use hyper::{body::Bytes, client::conn::http2, Request};
use once_cell::sync::Lazy;
Expand All @@ -61,6 +63,7 @@ use {bincode::Decode, futures_lite::AsyncWriteExt};

const CONN_RETRY_SECS: u64 = 1;
const NUM_RETRY_CONN: usize = 3;
const DOWNLOAD_TIMEOUT_SECS: u64 = 30;
const SIZEOF_APPDESC: usize = 256;
const MAX_VER_LEN: usize = 128;
pub const OTA_MODEL_TYPE: &str = "ota_service";
Expand Down Expand Up @@ -120,6 +123,14 @@ pub(crate) enum ConfigError {
Other(String),
}

#[derive(Error, Debug)]
pub(crate) enum DownloadError {
#[error("resolving next frame took longer than {0} seconds")]
Timeout(usize),
#[error(transparent)]
Network(#[from] hyper::Error),
}

#[allow(dead_code)]
#[derive(Error, Debug)]
pub(crate) enum OtaError<S: OtaMetadataStorage> {
Expand All @@ -129,6 +140,8 @@ pub(crate) enum OtaError<S: OtaMetadataStorage> {
ConfigError(#[from] ConfigError),
#[error(transparent)]
NetworkError(#[from] hyper::Error),
#[error(transparent)]
DownloadError(#[from] DownloadError),
#[error("{0}")]
UpdateError(String),
#[error("failed to initialize ota process")]
Expand Down Expand Up @@ -317,7 +330,6 @@ impl<S: OtaMetadataStorage> OtaService<S> {
Ok(connection) => {
match connection.await {
Ok(io) => {
// TODO(RSDK-9617): add timeout for stalled download
match http2::Builder::new(self.exec.clone())
.max_frame_size(16_384) // lowest configurable
.timer(H2Timer)
Expand Down Expand Up @@ -360,7 +372,7 @@ impl<S: OtaMetadataStorage> OtaService<S> {
.uri(uri)
.body(Empty::<Bytes>::new())
.map_err(|e| OtaError::Other(e.to_string()))?;
let mut response = sender
let response = sender
.send_request(request)
.await
.map_err(|e| OtaError::Other(e.to_string()))?;
Expand Down Expand Up @@ -414,84 +426,98 @@ impl<S: OtaMetadataStorage> OtaService<S> {
let mut got_info = false;

log::info!("writing new firmware to address `{:#x}`", self.address,);
let mut stream = response.into_data_stream();

loop {
match stream
.try_next()
.map_err(DownloadError::Network)
.or(async {
async_io::Timer::after(Duration::from_secs(DOWNLOAD_TIMEOUT_SECS)).await;
Err(DownloadError::Timeout(DOWNLOAD_TIMEOUT_SECS as usize))
})
.await
{
Ok(Some(data)) => {
total_downloaded += data.len();

if !got_info {
if total_downloaded < SIZEOF_APPDESC {
log::error!("initial frame too small to retrieve esp_app_desc_t");
} else {
#[cfg(feature = "esp32")]
{
log::info!("verifying new ota firmware");
let mut loader = EspFirmwareInfoLoader::new();
loader
.load(&data)
.map_err(|e| OtaError::InvalidFirmware(e.to_string()))?;
let new_fw = loader
.get_info()
.map_err(|e| OtaError::InvalidFirmware(e.to_string()))?;
log::debug!(
"current firmware app description: {:?}",
running_fw_info
);
log::debug!("new firmware app description: {:?}", new_fw);
}
#[cfg(not(feature = "esp32"))]
{
log::debug!("deserializing app header");
if let Ok(decoded) = bincode::decode_from_slice::<
EspAppDesc,
bincode::config::Configuration,
>(
&data[..SIZEOF_APPDESC],
bincode::config::standard(),
) {
log::debug!("{:?}", decoded.0);
}
}
got_info = true;
}
}

while let Some(next) = response.frame().await {
let frame = next.map_err(|e| OtaError::Other(e.to_string()))?;
if !frame.is_data() {
return Err(OtaError::Other(
"download contained non-data frame".to_string(),
));
}

// Err variant returns the original frame, not an impl Error
let data = frame
.into_data()
.map_err(|_| OtaError::Other("failed to get data from frame".to_string()))?;
total_downloaded += data.len();
if data.len() + nwritten > self.max_size {
log::error!("file is larger than expected, aborting");
#[cfg(feature = "esp32")]
update_handle
.abort()
.map_err(|e| OtaError::AbortError(format!("{:?}", e)))?;
return Err(OtaError::InvalidImageSize(
data.len() + nwritten,
self.max_size,
));
}

if !got_info {
if total_downloaded < SIZEOF_APPDESC {
log::error!("initial frame too small to retrieve esp_app_desc_t");
} else {
// TODO(RSDK-9271) add async writer for ota
#[cfg(feature = "esp32")]
{
log::info!("verifying new ota firmware");
let mut loader = EspFirmwareInfoLoader::new();
loader
.load(&data)
.map_err(|e| OtaError::InvalidFirmware(e.to_string()))?;
let new_fw = loader
.get_info()
.map_err(|e| OtaError::InvalidFirmware(e.to_string()))?;
log::debug!("current firmware app description: {:?}", running_fw_info);
log::debug!("new firmware app description: {:?}", new_fw);
}
update_handle
.write(&data)
.map_err(|e| OtaError::WriteError(e.to_string()))?;
#[cfg(not(feature = "esp32"))]
{
log::debug!("deserializing app header");
if let Ok(decoded) = bincode::decode_from_slice::<
EspAppDesc,
bincode::config::Configuration,
>(
&data[..SIZEOF_APPDESC], bincode::config::standard()
) {
log::debug!("{:?}", decoded.0);
}
}
got_info = true;
let _n = update_handle
.write(&data)
.await
.map_err(|e| OtaError::WriteError(e.to_string()))?;
// TODO change back to 'n' after impl async writer
nwritten += data.len();
log::info!(
"updating next OTA partition at {:#x}: {}/{} bytes written",
self.address,
nwritten,
file_len
);
}
Ok(None) => break,
Err(e) => {
#[cfg(feature = "esp32")]
update_handle
.abort()
.map_err(|e| OtaError::AbortError(format!("{:?}", e)))?;
return Err(OtaError::DownloadError(e));
}
}

if data.len() + nwritten > self.max_size {
log::error!("file is larger than expected, aborting");
#[cfg(feature = "esp32")]
update_handle
.abort()
.map_err(|e| OtaError::AbortError(format!("{:?}", e)))?;
return Err(OtaError::InvalidImageSize(
data.len() + nwritten,
self.max_size,
));
}

// TODO(RSDK-9271) add async writer for ota
#[cfg(feature = "esp32")]
update_handle
.write(&data)
.map_err(|e| OtaError::WriteError(e.to_string()))?;
#[cfg(not(feature = "esp32"))]
let _n = update_handle
.write(&data)
.await
.map_err(|e| OtaError::WriteError(e.to_string()))?;
// TODO change back to 'n' after impl async writer
nwritten += data.len();
log::info!(
"updating next OTA partition at {:#x}: {}/{} bytes written",
self.address,
nwritten,
file_len
);
}

drop(conn);
Expand Down

0 comments on commit 249f2eb

Please sign in to comment.