Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
666 changes: 68 additions & 598 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions pollinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ bb8-postgres.workspace = true
bytes = "1.11.1"
env_logger.workspace = true
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["tokio"] }
hyper-util = { version = "0.1", features = ["http1", "client-legacy", "tokio"] }
hyper = { version = "1.8", features = ["client", "http1"] }
hyper-tls = "0.6"
jiff.workspace = true
log.workspace = true
resin = { path = "../resin" }
Expand All @@ -26,9 +27,7 @@ tokio-stream.workspace = true
tokio-tungstenite.workspace = true
tungstenite.workspace = true
whoami.workspace = true
postgres = { version = "0.19.9", features = ["with-serde_json-1"] }
openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.11", features = ["json", "blocking"] }

[dependencies.futures-util]
version = "0.3.28"
Expand Down
5 changes: 1 addition & 4 deletions pollinator/src/comm_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,7 @@ async fn try_run_link(link: &CommLink, db: Option<Database>) -> Result<()> {
sensor.run(db).await
}
Some(CommProtocol::CampbellCloud) => {
tokio::task::spawn_blocking(|| {
rwis_api::run().expect("rwis_api run failed");
}).await?;
Ok(())
rwis_api::run(db).await
}
_ => Err(Error::InvalidConfig("protocol"))
}
Expand Down
75 changes: 74 additions & 1 deletion pollinator/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use http_body_util::{BodyExt, Empty};
use hyper::body::Incoming;
use hyper::header::{AUTHORIZATION, HeaderValue};
use hyper::{Request, Response, Uri};
use hyper_util::rt::TokioIo;
use hyper_tls::HttpsConnector;
use hyper_util::rt::{TokioExecutor, TokioIo};
use resin::{Error, Result};
use tokio::net::TcpStream;

Expand All @@ -28,6 +29,15 @@ pub struct Client {
bearer_token: Option<String>,
}

/// HTTP client requestor
#[derive(Clone, Debug, Default)]
pub struct HttpsClient {
/// URI address
uri: String,
/// Bearer token
bearer_token: Option<String>,
}

impl Client {
/// Make a new HTTP client
pub fn new(uri: &str) -> Self {
Expand Down Expand Up @@ -93,6 +103,69 @@ impl Client {
}
}

impl HttpsClient {
/// Make a new HTTP client
pub fn new(uri: &str) -> Self {
let uri = uri.to_string();
HttpsClient {
uri,
bearer_token: None,
}
}

pub fn host(&self) -> Result<String> {
let uri = self.uri.parse::<Uri>()?;
Ok(uri.host().ok_or(Error::InvalidConfig("host"))?.to_string())
}

/// Set bearer token
pub fn set_bearer_token(&mut self, bearer_token: String) {
self.bearer_token = Some(bearer_token);
}

/// Make a `GET` request
pub async fn get(&self, path: &str) -> Result<Vec<u8>> {
let host = self.host()?;
let uri = format!("https://{host}/{path}").parse::<Uri>()?;
log::debug!("HTTPS GET to {uri}");

let https = HttpsConnector::new();
let client = hyper_util::client::legacy::Client
::builder(TokioExecutor::new())
.build::<_, _>(https);

let mut req = Request::get(uri);
if let Some(token) = &self.bearer_token {
req = req.header(AUTHORIZATION, HeaderValue::from_str(&("Bearer ".to_owned() + token))?);
}
let req = req.body(Empty::<Bytes>::new())?;

let res = client.request(req).await?;
let body = parse_response(res).await?;
Ok(body)
}

/// Make an http `POST` request (JSON)
pub async fn post(&self, path: &str, body: &str) -> Result<Vec<u8>> {
let host = self.host()?;
let uri = format!("https://{host}/{path}").parse::<Uri>()?;
log::debug!("HTTPS POST to {uri}");

let https = HttpsConnector::new();
let client = hyper_util::client::legacy::Client
::builder(TokioExecutor::new())
.build::<_, _>(https);

let req = Request::post(uri)
.header("content-type", "application/json")
.body(body.to_string())?;

let res = client.request(req).await?;
let body = parse_response(res).await?;
Ok(body)
}
}

/// Parse HTTP response
async fn parse_response(mut res: Response<Incoming>) -> Result<Vec<u8>> {
let status = res.status();
Expand Down
120 changes: 59 additions & 61 deletions pollinator/src/rwis_api/api_utility.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use reqwest::blocking::Client;
use reqwest::Error;
use serde::{Deserialize};
use serde_json::{Value, json};
use resin::{Error, Result};
use std::time::{Duration, SystemTime};
use crate::http;

#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct Auth {
access_token: String,
expires_in: u64,
}

pub struct ApiUtility {
client: Client,
base_url: String,
client: http::HttpsClient,
username: String,
password: String,
auth: Auth,
auth: Option<Auth>,
auth_time: SystemTime,
organization_id: String,
assets: Option<Value>,
Expand All @@ -24,36 +23,36 @@ pub struct ApiUtility {

impl ApiUtility {
/** Request new authorization tokens from the API */
fn get_auth(base_url: &str, u: &str, p: &str) -> Auth {
let auth_url = format!("{}/api/v1/tokens", base_url);
let raw_response = Client::new()
.post(&auth_url)
.json(&json!({
"client_id": "cloud",
"grant_type": "password",
"username": u,
"password": p
}))
.send()
.expect("Failed to send auth request")
.text()
.expect("Failed to read response body");

serde_json::from_str(&raw_response)
.expect("Failed to parse auth response as JSON")
async fn get_auth(client: http::HttpsClient, u: &str, p: &str) -> Result<Auth> {
let body = format!("{{\
\"username\": \"{u}\", \
\"password\": \"{p}\", \
\"client_id\": \"cloud\", \
\"grant_type\": \"password\" \
}}");
let resp = client.post("api/v1/tokens", &body).await?;
Ok(serde_json::from_slice(&resp)?)
}

/**
* Create a new ApiUtility class.
* Used to access API programmatically and update the auth if needed.
*/
pub fn new(base_url: &str, username: &str, password: &str, organization_id: &str) -> Self {
pub async fn new(base_url: &str, username: &str, password: &str, organization_id: &str) -> Self {
let mut c = http::HttpsClient::new(base_url);
let a_res = Self::get_auth(c.clone(), username, password).await;
let a_opt = match a_res {
Ok(a) => {
c.set_bearer_token(a.access_token.clone());
Some(a)
},
Err(_) => None
};
ApiUtility {
client: Client::new(),
base_url: base_url.to_string(),
client: c,
username: username.to_string(),
password: password.to_string(),
auth: Self::get_auth(base_url, username, password),
auth: a_opt,
auth_time: SystemTime::now(),
organization_id: organization_id.to_string(),
assets: None,
Expand All @@ -62,78 +61,77 @@ impl ApiUtility {
}

/** Check if the auth has expired, and refresh it if so */
fn update_auth(&mut self) {
let now = SystemTime::now();
if now.duration_since(self.auth_time).unwrap_or_default() < Duration::from_secs(self.auth.expires_in) {
return;
async fn update_auth(&mut self) -> Result<()> {
if let Some(auth) = &self.auth {
let now = SystemTime::now();
if now.duration_since(self.auth_time).unwrap_or_default() < Duration::from_secs(auth.expires_in) {
return Ok(());
}
self.auth_time = SystemTime::now();
} else {
log::error!("Couldn't authenticate with CampbellCloud API.");
}
self.auth_time = SystemTime::now();
self.auth = Self::get_auth(&self.base_url, &self.username, &self.password);
let a = Self::get_auth(self.client.clone(), &self.username, &self.password).await?;
self.client.set_bearer_token(a.access_token.clone());
self.auth = Some(a);
Ok(())
}

/** Send a GET request defined by the endpoint and return the result if successful */
pub fn get_request(&mut self, endpoint: &str) -> Result<Value, Error> {
self.update_auth();
pub async fn get_request(&mut self, endpoint: &str) -> Result<Value> {
self.update_auth().await?;

let request_url = format!("{}/{}", self.base_url, endpoint);
let response = self
.client
.get(&request_url)
.bearer_auth(&self.auth.access_token)
.send()?;

if response.status().is_success() {
let json = response.json()?;
Ok(json)
} else {
if let Err(e) = response.error_for_status_ref() {
Err(e)
} else {
Ok(json!("Couldn't parse error."))
let response = self.client.get(endpoint).await?;
match serde_json::from_slice(&response) {
Ok(val) => {
Ok(val)
},
Err(e) => {
return Err(Error::SerdeJson(e))
}
}
}

/** Request to list-datastreams API endpoint */
pub fn list_datastreams(&mut self) -> Result<Value, Error> {
pub async fn list_datastreams(&mut self) -> Result<Value> {
if let Some(ds) = &self.datastreams {
return Ok(ds.to_owned());
}
let endpoint = format!(
"api/v1/organizations/{}/datastreams?limit={}",
self.organization_id, i32::MAX
);
let ds = self.get_request(&endpoint)?;
let ds = self.get_request(&endpoint).await?;
self.datastreams = Some(ds.clone());
Ok(ds)
}

/** Request to get-datastream-datapoints-last API endpoint */
pub fn last_datapoint(&mut self, datastream: &str) -> Result<Value, Error> {
pub async fn last_datapoint(&mut self, datastream: &str) -> Result<Value> {
let endpoint = format!(
"api/v1/organizations/{}/datastreams/{}/datapoints/last",
self.organization_id, datastream
);
self.get_request(&endpoint)
self.get_request(&endpoint).await
}

/** Request to list-assets API endpoint */
pub fn list_assets(&mut self) -> Result<Value, Error> {
pub async fn list_assets(&mut self) -> Result<Value> {
if let Some(a) = &self.assets {
return Ok(a.to_owned());
}
let endpoint = format!(
"api/v1/organizations/{}/assets",
self.organization_id
);
let a = self.get_request(&endpoint)?;
let a = self.get_request(&endpoint).await?;
self.assets = Some(a.clone());
Ok(a)
}

/** Takes a serial number of an asset, and returns the ID for that asset */
pub fn get_id_from_serial(&mut self, s: &str) -> Option<Value> {
if let Ok(assets) = self.list_assets() {
pub async fn get_id_from_serial(&mut self, s: &str) -> Option<Value> {
if let Ok(assets) = self.list_assets().await {
for a in assets.as_array()? {
if a["metadata"]["serial"] == json!(s) {
return Some(a["id"].clone());
Expand All @@ -147,14 +145,14 @@ impl ApiUtility {
* Return the value of the last datapoint of datastream for an asset.
* Wraps last_datapoint, finding the ID by measurement name ("field") and asset.
*/
pub fn get_asset_last_datapoint_value(&mut self, asset_id: &str, datastream_name: &str) -> Result<Value, Error> {
let datastreams = self.list_datastreams();
pub async fn get_asset_last_datapoint_value(&mut self, asset_id: &str, datastream_name: &str) -> Result<Value> {
let datastreams = self.list_datastreams().await;
let mut res = Ok(json!({}));
if let Ok(ds) = datastreams {
for d in ds.as_array().unwrap() {
if d["asset_id"] == json!(asset_id) && d["metadata"]["field"] == json!(datastream_name) {
let id = d["id"].as_str().unwrap();
if let Ok(data) = self.last_datapoint(id) {
if let Ok(data) = self.last_datapoint(id).await {
res = Ok(data["data"][0]["value"].clone());
break;
}
Expand Down
Loading
Loading