Skip to content

Commit

Permalink
Merge pull request #52 from slickage/ets-caching
Browse files Browse the repository at this point in the history
Parsed Posts ETS Caching
  • Loading branch information
akinsey authored Feb 18, 2025
2 parents bf88b01 + 0de504c commit 571c0c5
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 46 deletions.
2 changes: 2 additions & 0 deletions lib/epochtalk_server/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule EpochtalkServer.Application do
:poolboy.child_spec(:bbc_parser, bbc_parser_poolboy_config()),
# Start Role Cache
EpochtalkServer.Cache.Role,
# Start the ETS Cache
EpochtalkServer.Cache.ParsedPosts,
# Warm frontend_config variable (referenced by api controllers)
# This task starts, does its thing and dies
{Task, &EpochtalkServer.Models.Configuration.warm_frontend_config/0},
Expand Down
133 changes: 133 additions & 0 deletions lib/epochtalk_server/cache/parsed_posts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
defmodule EpochtalkServer.Cache.ParsedPosts do
use GenServer
require Logger
import Ex2ms

@table_name :parsed_posts
@max_size 10_000
@purge_size 1000
@expiry_days 30

@moduledoc """
`ParsedPosts` cache genserver, used to cache parsed post data in a table in ETS
"""

## === genserver functions ====

@impl true
def init(:ok), do: {:ok, setup()}

@impl true
def handle_call({:get, key}, _from, state) do
case :ets.lookup(@table_name, key) do
[{_key, cached_value, _expires}] -> {:reply, {:ok, cached_value}, state}
[] -> {:reply, {:error, []}, state}
end
end

@impl true
def handle_call({:put, key, new_value}, _from, state) do
expires = DateTime.add(DateTime.utc_now(), @expiry_days, :day) |> DateTime.to_unix()
:ets.insert(@table_name, {key, new_value, expires})
{:reply, {:ok, new_value}, state}
end

@impl true
def handle_call({:need_update, key, new_value}, _from, state) do
case :ets.lookup(@table_name, key) do
[{_key, cached_value, _expires}] ->
if cached_value.updated_at < new_value.updated_at do
# new_value was updated since it was cached, needs to be parsed
{:reply, true, state}
else
# new_value is not updated, use cached value
{:reply, false, state}
end

# key not found in cache, needs to be parsed
[] ->
{:reply, true, state}
end
end

@impl true
def handle_call(:lookup_and_purge, _from, state) do
# if table size is greater than max size, purge
if :ets.info(@table_name, :size) > @max_size do
purge()
{:reply, true, state}
end

{:reply, false, state}
end

## === cache api functions ====

def get(key) do
GenServer.call(__MODULE__, {:get, key})
end

def put(key, new_value) do
GenServer.call(__MODULE__, {:put, key, new_value})
end

def need_update(key, new_value) do
GenServer.call(__MODULE__, {:need_update, key, new_value})
end

def lookup_and_purge() do
GenServer.call(__MODULE__, :lookup_and_purge)
end

@doc """
Start genserver and create a reference for supervision tree
"""
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

## === private functions ====

# setup ETS table
defp setup() do
:ets.new(@table_name, [:set, :public, :named_table, read_concurrency: true])
{:ok, %{}}
end

defp purge() do
# keep table fixated while purging
:ets.safe_fixtable(@table_name, true)
do_purge()
after
:ets.safe_fixtable(@table_name, false)
end

defp do_purge() do
count = 0
now = DateTime.utc_now() |> DateTime.to_unix()
# select keys that have expired
case :ets.select(
@table_name,
fun do
{key, _, expires} when ^now > expires -> key
end
) do
[] ->
# if no keys have expired, purge from first key in the table
purge_from_first(:ets.first(@table_name), count)

keys ->
# if keys have expired, delete them
Enum.each(keys, fn key -> :ets.delete(@table_name, key) end)
end
end

defp purge_from_first(_, @purge_size), do: :ok

defp purge_from_first(:"$end_of_table", _), do: :ok

defp purge_from_first(key, count) do
:ets.delete(@table_name, key)
purge_from_first(:ets.next(@table_name, key), count + 1)
end
end
125 changes: 79 additions & 46 deletions lib/epochtalk_server_web/json/post_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -460,20 +460,24 @@ defmodule EpochtalkServerWeb.Controllers.PostJSON do
{body_list, signature_list} =
posts
|> Enum.reduce({[], []}, fn post, {body_list, signature_list} ->
body = String.replace(Map.get(post, :body) || Map.get(post, :body_html), "'", "\'")

# add space to end if the last character is a backslash (fix for parser)
body_len = String.length(body)
last_char = String.slice(body, (body_len - 1)..body_len)
body = if last_char == "\\", do: body <> " ", else: body

signature =
if Map.get(post.user, :signature),
do: String.replace(post.user.signature, "'", "\'"),
else: nil

# return body/signature lists in reverse order
{[body | body_list], [signature | signature_list]}
if EpochtalkServer.Cache.ParsedPosts.need_update(post.id, post) do
body = String.replace(Map.get(post, :body) || Map.get(post, :body_html), "'", "\'")

# add space to end if the last character is a backslash (fix for parser)
body_len = String.length(body)
last_char = String.slice(body, (body_len - 1)..body_len)
body = if last_char == "\\", do: body <> " ", else: body

signature =
if Map.get(post.user, :signature),
do: String.replace(post.user.signature, "'", "\'"),
else: nil

# return body/signature lists in reverse order
{[body | body_list], [signature | signature_list]}
else
{[nil | body_list], [nil | signature_list]}
end
end)

# reverse body/signature lists
Expand All @@ -497,37 +501,66 @@ defmodule EpochtalkServerWeb.Controllers.PostJSON do

defp zip_posts(posts, parsed_body_list, parsed_signature_list) do
# zip posts with body/signature lists
Enum.zip_with(
[posts, parsed_body_list, parsed_signature_list],
fn [post, parsed_body, parsed_signature] ->
parsed_body =
case parsed_body do
{:ok, parsed_body} ->
Logger.debug("#{__MODULE__}(body): post_id #{inspect(post.id)}")
parsed_body

{:timeout, unparsed_body} ->
Logger.error("#{__MODULE__}(body timeout): post_id #{inspect(post.id)}")
unparsed_body
end

parsed_signature =
case parsed_signature do
{:ok, parsed_signature} ->
Logger.debug("#{__MODULE__}(signature): user_id #{inspect(post.user.id)}")
parsed_signature

{:timeout, unparsed_signature} ->
Logger.error("#{__MODULE__}(signature timeout): user_id #{inspect(post.user.id)}")
unparsed_signature
end

user = post.user |> Map.put(:signature, parsed_signature)

post
|> Map.put(:body_html, parsed_body)
|> Map.put(:user, user)
end
)
zipped_posts =
Enum.zip_with(
[posts, parsed_body_list, parsed_signature_list],
fn [post, parsed_body, parsed_signature] ->
parsed_body =
case parsed_body do
{:ok, parsed_body} ->
Logger.debug("#{__MODULE__}(body): post_id #{inspect(post.id)}")
parsed_body

{:timeout, unparsed_body} ->
Logger.error("#{__MODULE__}(body timeout): post_id #{inspect(post.id)}")
unparsed_body
end

parsed_signature =
case parsed_signature do
{:ok, parsed_signature} ->
Logger.debug("#{__MODULE__}(signature): user_id #{inspect(post.user.id)}")
parsed_signature

{:timeout, unparsed_signature} ->
Logger.error("#{__MODULE__}(signature timeout): user_id #{inspect(post.user.id)}")
unparsed_signature
end

user = post.user |> Map.put(:signature, parsed_signature)

post =
if parsed_body do
# post was parsed, store it in cache
EpochtalkServer.Cache.ParsedPosts.put(post.id, %{
body_html: parsed_body,
updated_at: post.updated_at
})

post
|> Map.put(:body_html, parsed_body)
|> Map.put(:user, user)
else
# post was not parsed, get value from cache
post =
case EpochtalkServer.Cache.ParsedPosts.get(post.id) do
{:ok, cached_post} ->
post
|> Map.put(:body_html, cached_post.body_html)
|> Map.put(:user, user)

{:error, _} ->
nil
end

post
end

post
end
)

EpochtalkServer.Cache.ParsedPosts.lookup_and_purge()
zipped_posts
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule EpochtalkServer.MixProject do
{:dotenv_parser, "~> 2.0"},
{:earmark, "~> 1.4"},
{:ecto_sql, "~> 3.6"},
{:ex2ms, "~> 1.0"},
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:ex_doc, "~> 0.29.4"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"},
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"},
"ex_aws": {:hex, :ex_aws, "2.5.7", "dbcda183903cded392742129bd5c67ccf59caed4ded604d5e68b96e75570d743", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.3", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2c3c577550bfc4d0899e9fed9aeef91bc6a2aedd0177b1faa726c9b20d005074"},
"ex_aws_s3": {:hex, :ex_aws_s3, "2.5.5", "d718e90e0e4803c5605ca1a7cebf44236f7439f6706151ca4485fc2dffd08bc1", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "ac34af1e9b3974168dda798d2fded5d12d52a1b5cf52abfeffed2a63d2eb5443"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
Expand Down

0 comments on commit 571c0c5

Please sign in to comment.