Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ jobs:
aws-region: eu-west-1
audience: sts.amazonaws.com

- name: Create OTA Job
run: |
./scripts/create_ota.sh

- name: Integration Tests
uses: actions-rs/cargo@v1
with:
Expand All @@ -130,8 +126,3 @@ jobs:
IDENTITY_PASSWORD: ${{ secrets.IDENTITY_PASSWORD }}
AWS_HOSTNAME: a1vq3mi5y3c6j5-ats.iot.eu-west-1.amazonaws.com
RUST_LOG: debug

- name: Cleanup OTA Jobs
if: ${{ always() }}
run: |
./scripts/cleanup_ota.sh
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ p256 = "0.13"
pkcs8 = { version = "0.10", features = ["encryption", "pem"] }
hex = { version = "0.4.3", features = ["alloc"] }

aws-config = "1"
aws-sdk-sts = "1"
aws-sdk-s3 = "1"
aws-sdk-iot = "1"
aws-credential-types = "1"
uuid = { version = "1", features = ["v4"] }
base64 = "0.22"
serial_test = "3"


[features]
default = ["ota_mqtt_data", "metric_cbor", "provision_cbor", "shadows_kv_persist"]
Expand Down
98 changes: 74 additions & 24 deletions src/jobs/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use serde::Serialize;
use crate::jobs::{data_types::JobStatus, MAX_CLIENT_TOKEN_LEN};
use crate::mqtt::{PayloadError, ToPayload};

use super::StatusDetailsOwned;

/// Updates the status of a job execution. You can optionally create a step
/// timer by setting a value for the stepTimeoutInMinutes property. If you don't
/// update the value of this property by running UpdateJobExecution again, the
/// job execution times out when the step timer expires.
///
/// Topic: $aws/things/{thingName}/jobs/{jobId}/update
///
/// The type parameter `S` allows any serializable type for status_details,
/// enabling different consumers to provide different status structures.
#[derive(Debug, PartialEq, Serialize)]
pub struct UpdateJobExecutionRequest<'a> {
pub struct UpdateJobExecutionRequest<'a, S: Serialize = ()> {
/// Optional. A number that identifies a particular job execution on a
/// particular device.
#[serde(rename = "executionNumber")]
Expand Down Expand Up @@ -42,21 +43,19 @@ pub struct UpdateJobExecutionRequest<'a> {
/// REJECTED). This must be specified on every update.
#[serde(rename = "status")]
pub status: JobStatus,
// / Optional. A collection of name/value pairs that describe the status of
// the job execution. If not specified, the statusDetails are unchanged.
/// Optional. A collection of name/value pairs that describe the status of
/// the job execution. If not specified, the statusDetails are unchanged.
#[serde(rename = "statusDetails")]
#[serde(skip_serializing_if = "Option::is_none")]
pub status_details: Option<&'a StatusDetailsOwned>,
// Specifies the amount of time this device has to finish execution of this
// job. If the job execution status is not set to a terminal state before
// this timer expires, or before the timer is reset (by again calling
// <code>UpdateJobExecution</code>, setting the status to
// <code>IN_PROGRESS</code> and specifying a new timeout value in this
// field) the job execution status will be automatically set to
// <code>TIMED_OUT</code>. Note that setting or resetting this timeout has
// no effect on that job execution timeout which may have been specified
// when the job was created (<code>CreateJob</code> using field
// <code>timeoutConfig</code>).
pub status_details: Option<&'a S>,
/// Specifies the amount of time this device has to finish execution of this
/// job. If the job execution status is not set to a terminal state before
/// this timer expires, or before the timer is reset (by again calling
/// UpdateJobExecution, setting the status to IN_PROGRESS and specifying a
/// new timeout value in this field) the job execution status will be
/// automatically set to TIMED_OUT. Note that setting or resetting this
/// timeout has no effect on that job execution timeout which may have been
/// specified when the job was created (CreateJob using field timeoutConfig).
#[serde(rename = "stepTimeoutInMinutes")]
#[serde(skip_serializing_if = "Option::is_none")]
pub step_timeout_in_minutes: Option<i64>,
Expand All @@ -67,18 +66,24 @@ pub struct UpdateJobExecutionRequest<'a> {
pub client_token: Option<&'a str>,
}

pub struct Update<'a> {
/// Builder for job execution update requests.
///
/// The type parameter `S` represents the status details type. Use
/// [`status_details`](Self::status_details) to set the status details and
/// change the type parameter.
pub struct Update<'a, S: Serialize = ()> {
status: JobStatus,
client_token: Option<&'a str>,
status_details: Option<&'a StatusDetailsOwned>,
status_details: Option<&'a S>,
include_job_document: bool,
execution_number: Option<i64>,
include_job_execution_state: bool,
expected_version: Option<i64>,
step_timeout_in_minutes: Option<i64>,
}

impl<'a> Update<'a> {
impl<'a> Update<'a, ()> {
/// Create a new Update builder with the given job status.
pub fn new(status: JobStatus) -> Self {
Self {
status,
Expand All @@ -91,7 +96,10 @@ impl<'a> Update<'a> {
step_timeout_in_minutes: None,
}
}
}

impl<'a, S: Serialize> Update<'a, S> {
/// Set the client token for request correlation.
pub fn client_token(self, client_token: &'a str) -> Self {
assert!(client_token.len() < MAX_CLIENT_TOKEN_LEN);

Expand All @@ -101,41 +109,56 @@ impl<'a> Update<'a> {
}
}

pub fn status_details(self, status_details: &'a StatusDetailsOwned) -> Self {
Self {
/// Set the status details.
///
/// This method accepts any type that implements `Serialize`, allowing
/// different consumers to provide different status detail structures.
pub fn status_details<T: Serialize>(self, status_details: &'a T) -> Update<'a, T> {
Update {
status: self.status,
client_token: self.client_token,
status_details: Some(status_details),
..self
include_job_document: self.include_job_document,
execution_number: self.execution_number,
include_job_execution_state: self.include_job_execution_state,
expected_version: self.expected_version,
step_timeout_in_minutes: self.step_timeout_in_minutes,
}
}

/// Include the job document in the response.
pub fn include_job_document(self) -> Self {
Self {
include_job_document: true,
..self
}
}

/// Include the job execution state in the response.
pub fn include_job_execution_state(self) -> Self {
Self {
include_job_execution_state: true,
..self
}
}

/// Set the execution number.
pub fn execution_number(self, execution_number: i64) -> Self {
Self {
execution_number: Some(execution_number),
..self
}
}

/// Set the expected version for optimistic locking.
pub fn expected_version(self, expected_version: i64) -> Self {
Self {
expected_version: Some(expected_version),
..self
}
}

/// Set the step timeout in minutes.
pub fn step_timeout_in_minutes(self, step_timeout_in_minutes: i64) -> Self {
Self {
step_timeout_in_minutes: Some(step_timeout_in_minutes),
Expand All @@ -144,7 +167,7 @@ impl<'a> Update<'a> {
}
}

impl ToPayload for Update<'_> {
impl<S: Serialize> ToPayload for Update<'_, S> {
fn max_size(&self) -> usize {
512
}
Expand Down Expand Up @@ -176,7 +199,7 @@ mod test {

#[test]
fn serialize_requests() {
let req = UpdateJobExecutionRequest {
let req: UpdateJobExecutionRequest<'_, ()> = UpdateJobExecutionRequest {
client_token: Some("test_client:token_update"),
step_timeout_in_minutes: Some(50),
execution_number: Some(5),
Expand Down Expand Up @@ -216,4 +239,31 @@ mod test {
"$aws/things/test_client/jobs/test_job_id/update"
);
}

#[test]
fn serialize_with_status_details() {
// Test with a custom status details struct
#[derive(Serialize)]
struct TestStatus {
self_test: &'static str,
progress: &'static str,
}

let status = TestStatus {
self_test: "receiving",
progress: "10/100",
};

let update = Update::new(JobStatus::InProgress)
.client_token("test_client")
.status_details(&status);

let mut buf = [0u8; 512];
let len = update.encode(&mut buf).unwrap();

let json = core::str::from_utf8(&buf[..len]).unwrap();
assert!(json.contains(r#""self_test":"receiving""#));
assert!(json.contains(r#""progress":"10/100""#));
assert!(json.contains(r#""status":"IN_PROGRESS""#));
}
}
10 changes: 5 additions & 5 deletions src/mqtt/mqttrust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<P: ToPayload> mqttrust::ToPayload for PayloadBridge<P> {
}

/// Convert our [`QoS`] to mqttrust's `QoS`.
pub fn to_embedded_qos(qos: QoS) -> mqttrust::QoS {
pub fn to_mqttrust_qos(qos: QoS) -> mqttrust::QoS {
match qos {
QoS::AtMostOnce => mqttrust::QoS::AtMostOnce,
QoS::AtLeastOnce => mqttrust::QoS::AtLeastOnce,
Expand All @@ -53,7 +53,7 @@ pub fn to_embedded_qos(qos: QoS) -> mqttrust::QoS {
}

/// Convert mqttrust's `QoS` to our [`QoS`].
pub fn from_embedded_qos(qos: mqttrust::QoS) -> QoS {
pub fn from_mqttrust_qos(qos: mqttrust::QoS) -> QoS {
match qos {
mqttrust::QoS::AtMostOnce => QoS::AtMostOnce,
_ => QoS::AtLeastOnce,
Expand All @@ -76,7 +76,7 @@ impl<'a, M: RawMutex, B: BufferProvider> MqttMessage for mqttrust::Message<'a, M
}

fn qos(&self) -> QoS {
from_embedded_qos(self.qos_pid().qos())
from_mqttrust_qos(self.qos_pid().qos())
}

fn dup(&self) -> bool {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<'a, M: RawMutex> crate::mqtt::MqttClient for mqttrust::MqttClient<'a, M> {
let publish = mqttrust::Publish::builder()
.topic_name(topic)
.payload(PayloadBridge(payload))
.qos(to_embedded_qos(options.qos))
.qos(to_mqttrust_qos(options.qos))
.retain(options.retain)
.dup(options.dup)
.build();
Expand All @@ -144,7 +144,7 @@ impl<'a, M: RawMutex> crate::mqtt::MqttClient for mqttrust::MqttClient<'a, M> {
let subscribe_topics: [mqttrust::SubscribeTopic<'_>; N] = core::array::from_fn(|i| {
mqttrust::SubscribeTopic::builder()
.topic_path(topics[i].0)
.maximum_qos(to_embedded_qos(topics[i].1))
.maximum_qos(to_mqttrust_qos(topics[i].1))
.build()
});

Expand Down
6 changes: 4 additions & 2 deletions src/ota/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use embassy_time::Duration;

pub struct Config {
pub block_size: usize,
pub max_blocks_per_request: u32,
pub max_request_momentum: u8,
pub request_wait: Duration,
pub status_update_frequency: u32,
Expand All @@ -11,10 +12,11 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
block_size: 256,
block_size: 4096,
max_blocks_per_request: 16,
max_request_momentum: 3,
request_wait: Duration::from_secs(5),
status_update_frequency: 96,
status_update_frequency: 24,
self_test_timeout: None,
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/ota/control_interface/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::jobs::data_types::JobStatus;
use crate::ota::status_details::StatusDetailsExt;

use super::{
encoding::{json::JobStatusReason, FileContext},
Expand All @@ -11,10 +12,10 @@ pub mod mqtt;
// Interfaces required for OTA
pub trait ControlInterface {
async fn request_job(&self) -> Result<(), OtaError>;
async fn update_job_status(
async fn update_job_status<E: StatusDetailsExt>(
&self,
file_ctx: &FileContext,
progress: &mut ProgressState,
progress: &mut ProgressState<E>,
status: JobStatus,
reason: JobStatusReason,
) -> Result<(), OtaError>;
Expand Down
Loading