Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsed Posts ETS Caching #52

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/epochtalk_server/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule EpochtalkServer.Application do
:poolboy.child_spec(:bbc_parser, bbc_parser_poolboy_config()),
# Start Role Cache
EpochtalkServer.Cache.Role,
# Start the ETS Cache
EpochtalkServer.Cache.ParsedPosts,
# Warm frontend_config variable (referenced by api controllers)
# This task starts, does its thing and dies
{Task, &EpochtalkServer.Models.Configuration.warm_frontend_config/0},
Expand Down
133 changes: 133 additions & 0 deletions lib/epochtalk_server/cache/parsed_posts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
defmodule EpochtalkServer.Cache.ParsedPosts do
use GenServer
require Logger
import Ex2ms

@table_name :parsed_posts
@max_size 10_000
@purge_size 1000
@expiry_days 30

@moduledoc """
`ParsedPosts` cache genserver, used to cache parsed post data in a table in ETS
"""

## === genserver functions ====

@impl true
def init(:ok), do: {:ok, setup()}

@impl true
def handle_call({:get, key}, _from, state) do
case :ets.lookup(@table_name, key) do
[{_key, cached_value, _expires}] -> {:reply, {:ok, cached_value}, state}
[] -> {:reply, {:error, []}, state}
end
end

@impl true
def handle_call({:put, key, new_value}, _from, state) do
expires = DateTime.add(DateTime.utc_now(), @expiry_days, :day) |> DateTime.to_unix()
:ets.insert(@table_name, {key, new_value, expires})
{:reply, {:ok, new_value}, state}
end

@impl true
def handle_call({:need_update, key, new_value}, _from, state) do
case :ets.lookup(@table_name, key) do
[{_key, cached_value, _expires}] ->
if cached_value.updated_at < new_value.updated_at do
# new_value was updated since it was cached, needs to be parsed
{:reply, true, state}
else
# new_value is not updated, use cached value
{:reply, false, state}
end

# key not found in cache, needs to be parsed
[] ->
{:reply, true, state}
end
end

@impl true
def handle_call(:lookup_and_purge, _from, state) do
# if table size is greater than max size, purge
if :ets.info(@table_name, :size) > @max_size do
purge()
{:reply, true, state}
end

{:reply, false, state}
end

## === cache api functions ====

def get(key) do
GenServer.call(__MODULE__, {:get, key})
end

def put(key, new_value) do
GenServer.call(__MODULE__, {:put, key, new_value})
end

def need_update(key, new_value) do
GenServer.call(__MODULE__, {:need_update, key, new_value})
end

def lookup_and_purge() do
GenServer.call(__MODULE__, :lookup_and_purge)
end

@doc """
Start genserver and create a reference for supervision tree
"""
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

## === private functions ====

# setup ETS table
defp setup() do
:ets.new(@table_name, [:set, :public, :named_table, read_concurrency: true])
{:ok, %{}}
end

defp purge() do
# keep table fixated while purging
:ets.safe_fixtable(@table_name, true)
do_purge()
after
:ets.safe_fixtable(@table_name, false)
end

defp do_purge() do
count = 0
now = DateTime.utc_now() |> DateTime.to_unix()
# select keys that have expired
case :ets.select(
@table_name,
fun do
{key, _, expires} when ^now > expires -> key
end
) do
[] ->
# if no keys have expired, purge from first key in the table
purge_from_first(:ets.first(@table_name), count)

keys ->
# if keys have expired, delete them
Enum.each(keys, fn key -> :ets.delete(@table_name, key) end)
end
end

defp purge_from_first(_, @purge_size), do: :ok

defp purge_from_first(:"$end_of_table", _), do: :ok

defp purge_from_first(key, count) do
:ets.delete(@table_name, key)
purge_from_first(:ets.next(@table_name, key), count + 1)
end
end
125 changes: 79 additions & 46 deletions lib/epochtalk_server_web/json/post_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -450,20 +450,24 @@ defmodule EpochtalkServerWeb.Controllers.PostJSON do
{body_list, signature_list} =
posts
|> Enum.reduce({[], []}, fn post, {body_list, signature_list} ->
body = String.replace(Map.get(post, :body) || Map.get(post, :body_html), "'", "\'")

# add space to end if the last character is a backslash (fix for parser)
body_len = String.length(body)
last_char = String.slice(body, (body_len - 1)..body_len)
body = if last_char == "\\", do: body <> " ", else: body

signature =
if Map.get(post.user, :signature),
do: String.replace(post.user.signature, "'", "\'"),
else: nil

# return body/signature lists in reverse order
{[body | body_list], [signature | signature_list]}
if EpochtalkServer.Cache.ParsedPosts.need_update(post.id, post) do
body = String.replace(Map.get(post, :body) || Map.get(post, :body_html), "'", "\'")

# add space to end if the last character is a backslash (fix for parser)
body_len = String.length(body)
last_char = String.slice(body, (body_len - 1)..body_len)
body = if last_char == "\\", do: body <> " ", else: body

signature =
if Map.get(post.user, :signature),
do: String.replace(post.user.signature, "'", "\'"),
else: nil

# return body/signature lists in reverse order
{[body | body_list], [signature | signature_list]}
else
{[nil | body_list], [nil | signature_list]}
end
end)

# reverse body/signature lists
Expand All @@ -487,37 +491,66 @@ defmodule EpochtalkServerWeb.Controllers.PostJSON do

defp zip_posts(posts, parsed_body_list, parsed_signature_list) do
# zip posts with body/signature lists
Enum.zip_with(
[posts, parsed_body_list, parsed_signature_list],
fn [post, parsed_body, parsed_signature] ->
parsed_body =
case parsed_body do
{:ok, parsed_body} ->
Logger.debug("#{__MODULE__}(body): post_id #{inspect(post.id)}")
parsed_body

{:timeout, unparsed_body} ->
Logger.error("#{__MODULE__}(body timeout): post_id #{inspect(post.id)}")
unparsed_body
end

parsed_signature =
case parsed_signature do
{:ok, parsed_signature} ->
Logger.debug("#{__MODULE__}(signature): user_id #{inspect(post.user.id)}")
parsed_signature

{:timeout, unparsed_signature} ->
Logger.error("#{__MODULE__}(signature timeout): user_id #{inspect(post.user.id)}")
unparsed_signature
end

user = post.user |> Map.put(:signature, parsed_signature)

post
|> Map.put(:body_html, parsed_body)
|> Map.put(:user, user)
end
)
zipped_posts =
Enum.zip_with(
[posts, parsed_body_list, parsed_signature_list],
fn [post, parsed_body, parsed_signature] ->
parsed_body =
case parsed_body do
{:ok, parsed_body} ->
Logger.debug("#{__MODULE__}(body): post_id #{inspect(post.id)}")
parsed_body

{:timeout, unparsed_body} ->
Logger.error("#{__MODULE__}(body timeout): post_id #{inspect(post.id)}")
unparsed_body
end

parsed_signature =
case parsed_signature do
{:ok, parsed_signature} ->
Logger.debug("#{__MODULE__}(signature): user_id #{inspect(post.user.id)}")
parsed_signature

{:timeout, unparsed_signature} ->
Logger.error("#{__MODULE__}(signature timeout): user_id #{inspect(post.user.id)}")
unparsed_signature
end

user = post.user |> Map.put(:signature, parsed_signature)

post =
if parsed_body do
# post was parsed, store it in cache
EpochtalkServer.Cache.ParsedPosts.put(post.id, %{
body_html: parsed_body,
updated_at: post.updated_at
})

post
|> Map.put(:body_html, parsed_body)
|> Map.put(:user, user)
else
# post was not parsed, get value from cache
post =
case EpochtalkServer.Cache.ParsedPosts.get(post.id) do
{:ok, cached_post} ->
post
|> Map.put(:body_html, cached_post.body_html)
|> Map.put(:user, user)

{:error, _} ->
nil
end

post
end

post
end
)

EpochtalkServer.Cache.ParsedPosts.lookup_and_purge()
zipped_posts
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule EpochtalkServer.MixProject do
{:dotenv_parser, "~> 2.0"},
{:earmark, "~> 1.4"},
{:ecto_sql, "~> 3.6"},
{:ex2ms, "~> 1.0"},
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:ex_doc, "~> 0.29.4"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"},
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"},
"ex_aws": {:hex, :ex_aws, "2.5.7", "dbcda183903cded392742129bd5c67ccf59caed4ded604d5e68b96e75570d743", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.3", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2c3c577550bfc4d0899e9fed9aeef91bc6a2aedd0177b1faa726c9b20d005074"},
"ex_aws_s3": {:hex, :ex_aws_s3, "2.5.5", "d718e90e0e4803c5605ca1a7cebf44236f7439f6706151ca4485fc2dffd08bc1", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "ac34af1e9b3974168dda798d2fded5d12d52a1b5cf52abfeffed2a63d2eb5443"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
Expand Down
Loading