Skip to content

Commit

Permalink
feat: generating event from passthrough (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Feb 14, 2025
1 parent 1ad244f commit 92c43dc
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 5 deletions.
153 changes: 148 additions & 5 deletions api/src/logic/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@ use axum::{
routing::get,
Extension, Router,
};
use bson::doc;
use chrono::Utc;
use entities::{
constant::PICA_PASSTHROUGH_HEADER,
destination::{Action, Destination},
encrypted_access_key::EncryptedAccessKey,
event_access::EventAccess,
ApplicationError, InternalError, QUERY_BY_ID_PASSTHROUGH,
prefix::IdPrefix,
AccessKey, ApplicationError, Event, Id, InternalError, Store, META, PASSWORD_LENGTH,
QUERY_BY_ID_PASSTHROUGH,
};
use http::{header::CONTENT_LENGTH, HeaderMap, HeaderName, Method, Uri};
use hyper::body::Bytes;
use mongodb::options::FindOneOptions;
use serde::Deserialize;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};
use tracing::error;
use unified::domain::UnifiedMetadataBuilder;

pub fn get_router() -> Router<Arc<AppState>> {
Router::new().route(
Expand Down Expand Up @@ -43,6 +52,18 @@ pub async fn passthrough_request(
));
};

let Some(connection_secret_header) = headers.get(&state.config.headers.auth_header) else {
return Err(ApplicationError::bad_request(
"Connection header not found",
None,
));
};

let host = headers.get("host");
let host = host.and_then(|h| h.to_str().map(|s| s.to_string()).ok());

let connection_secret_header = connection_secret_header.clone();

let connection = get_connection(
user_event_access.as_ref(),
connection_key_header,
Expand All @@ -61,7 +82,7 @@ pub async fn passthrough_request(
platform: connection.platform.clone(),
action: Action::Passthrough {
path: uri.path().into(),
method,
method: method.clone(),
id: id.map(|i| i.into()),
},
connection_key: connection.key.clone(),
Expand All @@ -77,7 +98,7 @@ pub async fn passthrough_request(
.dispatch_destination_request(
Some(connection.clone()),
&destination,
headers,
headers.clone(),
query_params,
Some(body.to_vec()),
)
Expand Down Expand Up @@ -106,7 +127,114 @@ pub async fn passthrough_request(
}
});

let status = model_execution_result.status();
let connection_platform = connection.platform.to_string();
let connection_platform_version = connection.platform_version.to_string();
let connection_key = connection.key.to_string();
let request_headers = headers.clone();
let request_status_code = model_execution_result.status();

let database_c = state.app_stores.db.clone();
let event_access_pass_c = state.config.event_access_password.clone();
let event_tx_c = state.event_tx.clone();

tokio::spawn(async move {
let connection_secret_header: Option<String> =
connection_secret_header.to_str().map(|a| a.to_owned()).ok();

let options = FindOneOptions::builder()
.projection(doc! {
"connectionPlatform": 1,
"connectionDefinitionId": 1,
"platformVersion": 1,
"key": 1,
"title": 1,
"name": 1,
"path": 1,
"action": 1,
"actionName": 1
})
.build();

let cmd = database_c
.collection::<SparseCMD>(&Store::ConnectionModelDefinitions.to_string())
.find_one(doc! {
"connectionPlatform": connection_platform.clone(),
"path": uri.path().to_string(),
"action": method.to_string().to_uppercase()
})
.with_options(options)
.await
.ok()
.flatten();

if let (Some(cmd), Some(encrypted_access_key)) = (cmd, connection_secret_header) {
if let Ok(encrypted_access_key) = EncryptedAccessKey::parse(&encrypted_access_key) {
let path = uri.path().trim_end_matches('/');

let metadata = UnifiedMetadataBuilder::default()
.timestamp(Utc::now().timestamp_millis())
.platform_rate_limit_remaining(0)
.rate_limit_remaining(0)
.transaction_key(Id::now(IdPrefix::Transaction))
.platform(connection_platform.clone())
.platform_version(connection_platform_version.clone())
.common_model_version("v1")
.connection_key(connection_key)
.action(cmd.title)
.host(host)
.path(path.to_string())
.status_code(request_status_code)
.build()
.ok()
.map(|m| m.as_value());

let password: Option<[u8; PASSWORD_LENGTH]> =
event_access_pass_c.as_bytes().try_into().ok();

match password {
Some(password) => {
let access_key = AccessKey::parse(&encrypted_access_key, &password).ok();

let event_name = format!(
"{}::{}::{}::{}",
connection_platform,
connection_platform_version,
cmd.name,
cmd.action_name
);

let name = if request_status_code.is_success() {
format!("{event_name}::request-succeeded",)
} else {
format!("{event_name}::request-failed",)
};

let body = serde_json::to_string(&json!({
META: metadata,
}))
.unwrap_or_default();

if let Some(access_key) = access_key {
let event = Event::new(
&access_key,
&encrypted_access_key,
&name,
request_headers.clone(),
body,
);

if let Err(e) = event_tx_c.send(event).await {
error!("Could not send event to receiver: {e}");
}
} else {
tracing::error!("Error generating event for passthrough")
}
}
None => tracing::error!("Error generating event for passthrough"),
};
}
};
});

let metric = Metric::passthrough(connection);
if let Err(e) = state.metric_tx.send(metric).await {
Expand All @@ -122,5 +250,20 @@ pub async fn passthrough_request(
InternalError::script_error("Error retrieving bytes from response", None)
})?;

Ok((status, headers, bytes))
Ok((request_status_code, headers, bytes))
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SparseCMD {
pub connection_platform: String,
pub connection_definition_id: Id,
pub platform_version: String,
pub key: String,
pub title: String,
pub name: String,
pub path: String,
#[serde(with = "http_serde_ext_ios::method")]
pub action: Method,
pub action_name: String,
}
6 changes: 6 additions & 0 deletions unified/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bson::doc;
use derive_builder::Builder;
use entities::Id;
use entities::PicaError;
use http::StatusCode;
use http::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -173,6 +174,11 @@ pub struct UnifiedMetadata {
#[builder(default)]
common_model: Option<String>,
common_model_version: String,
#[builder(default)]
#[serde(with = "http_serde_ext_ios::status_code::option")]
status_code: Option<StatusCode>,
#[builder(default)]
path: Option<String>,
connection_key: String,
#[builder(setter(strip_option), default)]
latency: Option<i32>,
Expand Down

0 comments on commit 92c43dc

Please sign in to comment.