Skip to content

Commit

Permalink
Improve worker caching logic (#61)
Browse files Browse the repository at this point in the history
- For preview environments, allow much more to be cached
- Ensure only GET methods are cached

---------

Co-authored-by: Charlotte Andersson <[email protected]>
Co-authored-by: Augusto César <[email protected]>
  • Loading branch information
3 people authored Feb 26, 2024
1 parent 6d89ac2 commit e60a633
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 65 deletions.
4 changes: 3 additions & 1 deletion linkup/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use url::Url;

pub const PREVIEW_SESSION_TOKEN: &str = "preview_session";

#[derive(Clone, Debug)]
pub struct Session {
pub session_token: String,
Expand Down Expand Up @@ -315,7 +317,7 @@ pub fn create_preview_req_from_json(input_json: String) -> Result<Session, Confi
Err(e) => Err(ConfigError::JsonFormat(e)),
Ok(c) => {
let server_conf = StorableSession {
session_token: String::from("potato"),
session_token: String::from(PREVIEW_SESSION_TOKEN),
services: c.services,
domains: c.domains,
cache_routes: Some(vec![String::from(".*")]),
Expand Down
151 changes: 87 additions & 64 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use http_util::*;
use kv_store::CfWorkerStringStore;
use linkup::{HeaderMap as LinkupHeaderMap, *};
use regex::Regex;
use worker::*;
use ws::linkup_ws_handler;

Expand All @@ -10,6 +9,33 @@ mod kv_store;
mod utils;
mod ws;

#[event(fetch)]
pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result<Response> {
// Optionally, get more helpful error messages written to the console in the case of a panic.
utils::set_panic_hook();

let kv = match env.kv("LINKUP_SESSIONS") {
Ok(kv) => kv,
Err(e) => return plaintext_error(format!("Failed to get KV store: {}", e), 500),
};

let string_store = CfWorkerStringStore::new(kv);

let sessions = SessionAllocator::new(&string_store);

if let Ok(Some(upgrade)) = req.headers().get("upgrade") {
if upgrade == "websocket" {
return linkup_ws_handler(req, &sessions).await;
}
}

return match (req.method(), req.path().as_str()) {
(Method::Post, "/linkup") => linkup_session_handler(req, &sessions).await,
(Method::Post, "/preview") => linkup_preview_handler(req, &sessions).await,
_ => linkup_request_handler(req, &sessions).await,
};
}

async fn linkup_session_handler<'a, S: StringStore>(
mut req: Request,
sessions: &'a SessionAllocator<'a, S>,
Expand Down Expand Up @@ -68,48 +94,6 @@ async fn linkup_preview_handler<'a, S: StringStore>(
}
}

async fn get_cached_req(
req: &Request,
cache_routes: &Option<Vec<Regex>>,
) -> Result<Option<Response>> {
let path = req.path();

if let Some(routes) = cache_routes {
if routes.iter().any(|route| route.is_match(&path)) {
let url = req.url()?;
Cache::default().get(url.to_string(), false).await
} else {
Ok(None)
}
} else {
Ok(None)
}
}

async fn set_cached_req(
req: &Request,
mut resp: Response,
cache_routes: Option<Vec<Regex>>,
) -> Result<Response> {
if resp.status_code() != 200 {
return Ok(resp);
}

let path = req.path();

if let Some(routes) = cache_routes {
if routes.iter().any(|route| route.is_match(&path)) {
let url = req.url()?;
let cache_resp = resp.cloned()?;
Cache::default().put(url.to_string(), cache_resp).await?;

return Ok(resp);
}
}

Ok(resp)
}

async fn linkup_request_handler<'a, S: StringStore>(
mut req: Request,
sessions: &'a SessionAllocator<'a, S>,
Expand All @@ -127,8 +111,10 @@ async fn linkup_request_handler<'a, S: StringStore>(
Err(e) => return plaintext_error(format!("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate, {:?}",e), 422),
};

if let Some(cached_response) = get_cached_req(&req, &config.cache_routes).await? {
return Ok(cached_response);
if is_cacheable_request(&req, &config) {
if let Some(cached_response) = get_cached_req(&req, &session_name).await {
return Ok(cached_response);
}
}

let body_bytes = match req.bytes().await {
Expand Down Expand Up @@ -170,38 +156,75 @@ async fn linkup_request_handler<'a, S: StringStore>(
let mut cf_resp =
convert_reqwest_response_to_cf(response, &additional_response_headers()).await?;

cf_resp = set_cached_req(&req, cf_resp, config.cache_routes).await?;
if is_cacheable_request(&req, &config) {
cf_resp = set_cached_req(&req, cf_resp, session_name).await?;
}

Ok(cf_resp)
}

#[event(fetch)]
pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result<Response> {
// Optionally, get more helpful error messages written to the console in the case of a panic.
utils::set_panic_hook();
fn is_cacheable_request(req: &Request, config: &Session) -> bool {
if req.method() != Method::Get {
return false;
}

let kv = match env.kv("LINKUP_SESSIONS") {
Ok(kv) => kv,
Err(e) => return plaintext_error(format!("Failed to get KV store: {}", e), 500),
if config.session_token == PREVIEW_SESSION_TOKEN {
return true;
}

if let Some(routes) = &config.cache_routes {
let path = req.path();
if routes.iter().any(|route| route.is_match(&path)) {
return true;
}
}

false
}

fn get_cache_key(req: &Request, session_name: &String) -> Option<String> {
let mut cache_url = match req.url() {
Ok(url) => url,
Err(_) => return None,
};

let string_store = CfWorkerStringStore::new(kv);
let curr_domain = cache_url.domain().unwrap_or("example.com");
if cache_url
.set_host(Some(&format!("{}.{}", session_name, curr_domain)))
.is_err()
{
return None;
}

let sessions = SessionAllocator::new(&string_store);
Some(cache_url.to_string())
}

if let Ok(Some(upgrade)) = req.headers().get("upgrade") {
if upgrade == "websocket" {
return linkup_ws_handler(req, &sessions).await;
}
async fn get_cached_req(req: &Request, session_name: &String) -> Option<Response> {
let cache_key = match get_cache_key(req, session_name) {
Some(cache_key) => cache_key,
None => return None,
};

match Cache::default().get(cache_key, false).await {
Ok(Some(resp)) => Some(resp),
_ => None,
}
}

if req.method() == Method::Post && req.path() == "/linkup" {
return linkup_session_handler(req, &sessions).await;
async fn set_cached_req(
req: &Request,
mut resp: Response,
session_name: String,
) -> Result<Response> {
// Cache API throws error on 206 partial content
if resp.status_code() > 499 || resp.status_code() == 206 {
return Ok(resp);
}

if req.method() == Method::Post && req.path() == "/preview" {
return linkup_preview_handler(req, &sessions).await;
if let Some(cache_key) = get_cache_key(req, &session_name) {
let cache_resp = resp.cloned()?;
Cache::default().put(cache_key, cache_resp).await?;
}

linkup_request_handler(req, &sessions).await
Ok(resp)
}

0 comments on commit e60a633

Please sign in to comment.