Skip to content
This repository was archived by the owner on Mar 16, 2024. It is now read-only.

Commit b35a013

Browse files
committed
Initial commit
1 parent 2016121 commit b35a013

14 files changed

+468
-0
lines changed

.formatter.exs

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]

.gitignore

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# The directory Mix will write compiled artifacts to.
2+
/_build/
3+
4+
# If you run "mix test --cover", coverage assets end up here.
5+
/cover/
6+
7+
# The directory Mix downloads your dependencies sources to.
8+
/deps/
9+
10+
# Where third-party dependencies like ExDoc output generated docs.
11+
/doc/
12+
13+
# Ignore .fetch files in case you like to edit your project deps locally.
14+
/.fetch
15+
16+
# If the VM crashes, it generates a dump, let's ignore it too.
17+
erl_crash.dump
18+
19+
# Also ignore archive artifacts (built via "mix archive.build").
20+
*.ez
21+
22+
# Ignore package tarball (built via "mix hex.build").
23+
queutils-*.tar
24+

README.md

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Queutils
2+
3+
Handy little queues and producers to make using `GenStage` a breeze.
4+
5+
## Installation
6+
7+
If [available in Hex](https://hex.pm/docs/publish), the package can be installed
8+
by adding `queutils` to your list of dependencies in `mix.exs`:
9+
10+
```elixir
11+
def deps do
12+
[
13+
{:queutils, "~> 0.1.0"}
14+
]
15+
end
16+
```
17+
18+
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
19+
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
20+
be found at [https://hexdocs.pm/queutils](https://hexdocs.pm/queutils).

lib/queutils.ex

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
defmodule Queutils do
2+
@moduledoc """
3+
Handy little queues and producers.
4+
"""
5+
end

lib/queutils/blocking_producer.ex

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
defmodule Queutils.BlockingProducer do
2+
use GenStage
3+
4+
@moduledoc """
5+
A `GenStage` producer that acts as a blocking queue, with a fixed length.
6+
Blocks any time `Queutils.BlockingProducer.push/2` is called when the queue
7+
is at its maximum length.
8+
9+
This can be used as an entry-point to a `GenStage` pipeline, since the
10+
max queue length provides for back-pressure.
11+
You can even set the queue's length to zero in order to block all pushes
12+
until demand comes in.
13+
14+
15+
## Usage
16+
17+
Add it to your application supervisor's `start/2` function like this:
18+
19+
def start(_type, _args) do
20+
children = [
21+
...
22+
{Queutils.BlockingProducer, name: MessageProducer, max_length: 10_000},
23+
...
24+
]
25+
26+
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
27+
Supervisor.start_link(children, opts)
28+
end
29+
30+
Then, you can push messages to the queue like this:
31+
32+
:ok = Queutils.BlockingProducer.push(MessageProducer, :my_message)
33+
34+
35+
Broadway users be forewared! A Broadway module needs to start its producer itself,
36+
so it's not possible to customize the process ID a la the `:name` option documented below.
37+
If you're in that boat, you should use a `Queutils.BlockingQueue` along with
38+
a `Queutils.BlockingQueueProducer`, so you can customize your reference to your `BlockingQueue`.
39+
40+
## Options
41+
42+
- `:name` - the ID of the queue. This will be the first argument to the `push/2` function.
43+
- `:max_length` - The maximum number of messages that this process will store until it starts blocking. Default is 1,000.
44+
- `:dispatcher` - The `GenStage` dispatcher that this producer should use. Default is `GenStage.DemandDispatcher`.
45+
"""
46+
47+
def start_link(opts) do
48+
case Keyword.fetch(opts, :name) do
49+
{:ok, name} -> GenStage.start_link(__MODULE__, opts, name: name)
50+
:error -> GenStage.start_link(__MODULE__, opts)
51+
end
52+
end
53+
54+
def child_spec(opts) do
55+
%{
56+
id: Keyword.get(opts, :name, Queutils.BlockingProducer),
57+
start: {__MODULE__, :start_link, [opts]},
58+
type: :supervisor
59+
}
60+
end
61+
62+
def push(queue, msg) do
63+
GenStage.call(queue, {:push, msg})
64+
end
65+
66+
@impl true
67+
def init(opts) do
68+
dispatcher = Keyword.get(opts, :dispatcher, GenStage.DemandDispatcher)
69+
max_length = Keyword.get(opts, :max_length, 1_000)
70+
unless max_length >= 0 do
71+
raise "Invalid argument :max_length. Must be an integer zero or greater, but was #{inspect max_length}"
72+
end
73+
{:producer, %{queue: [], waiting: [], demand: 0, max_length: max_length}, dispatcher: dispatcher}
74+
end
75+
76+
@impl true
77+
def handle_call({:push, msg}, from, state) do
78+
:ok = validate_state(state)
79+
cond do
80+
state.demand > 0 ->
81+
remaining_demand = state.demand - 1
82+
{:reply, :ok, [msg], %{state | demand: remaining_demand}}
83+
Enum.count(state.queue) >= state.max_length ->
84+
waiting = state.waiting ++ [{from, msg}]
85+
{:noreply, [], %{state | waiting: waiting}}
86+
true ->
87+
queue = state.queue ++ [msg]
88+
{:reply, :ok, [], %{state | queue: queue}}
89+
end
90+
end
91+
92+
@impl true
93+
def handle_demand(demand, state) do
94+
:ok = validate_state(state)
95+
96+
total_demand = demand + state.demand
97+
98+
{popped, remaining} = Enum.split(state.queue, total_demand)
99+
{popped_waiters, still_waiting} = Enum.split(state.waiting, total_demand)
100+
101+
msgs_from_waiters = Enum.map(popped_waiters, fn {from, msg} ->
102+
GenStage.reply(from, :ok)
103+
msg
104+
end)
105+
106+
queue = remaining ++ msgs_from_waiters
107+
remaining_demand = total_demand - Enum.count(queue)
108+
109+
{:noreply, popped, %{state | demand: remaining_demand, queue: queue, waiting: still_waiting}}
110+
end
111+
112+
defp validate_state(state) do
113+
# If we have a non-zero demand, it is assumed that we will not have
114+
# anyone waiting and that the queue is empty, since we would have
115+
# popped off all those messages before building up any demand.
116+
117+
cond do
118+
state.demand < 0 ->
119+
raise "Incorrect state: BlockingProducer has a negative demand (demand is #{inspect state.demand})."
120+
state.demand > 0 && not Enum.empty?(state.queue) ->
121+
raise "Incorrect state: BlockingProducer has demand but also has items in its queue."
122+
state.demand > 0 && not Enum.empty?(state.waiting) ->
123+
raise "Incorrect state: BlockingProducer has demand but also has processes waiting to insert."
124+
true -> :ok
125+
end
126+
end
127+
end

lib/queutils/blocking_queue.ex

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
defmodule Queutils.BlockingQueue do
2+
use GenServer
3+
4+
@moduledoc """
5+
A queue with a fixed length that blocks on `pop/1` if the queue is full.
6+
7+
## Usage
8+
9+
Add it to your application supervisor's `start/2` function, after the queue it pulls from, like this:
10+
11+
def start(_type, _args) do
12+
children = [
13+
...
14+
{Queutils.BlockingQueue, name: MessageQueue, max_length: 10_000},
15+
...
16+
]
17+
18+
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
19+
Supervisor.start_link(children, opts)
20+
end
21+
22+
Then you can push and pop from the queue like this:
23+
24+
:ok = Queutils.Blockingqueue.push(MessageQueue, :my_message)
25+
[:my_message] = Queutils.Blockingqueue.pop(MessageQueue, 1)
26+
27+
## Options
28+
29+
- `:name` - the ID of the queue. This will be the first argument to the `push/2` function. Default is `BlockingQueue`.
30+
- `:max_length` - The maximum number of messages that this process will store until it starts blocking. Default is 1,000.
31+
"""
32+
33+
def start_link(opts) do
34+
name = Keyword.get(opts, :name)
35+
GenServer.start_link(__MODULE__, opts, name: name)
36+
end
37+
38+
def child_spec(opts) do
39+
%{
40+
id: Keyword.get(opts, :name, BlockingQueue),
41+
start: {__MODULE__, :start_link, [opts]},
42+
type: :supervisor
43+
}
44+
end
45+
46+
def init(opts) do
47+
max_length = Keyword.get(opts, :max_length, 1_000)
48+
{:ok, %{max_length: max_length, queue: [], waiting: []}}
49+
end
50+
51+
def push(queue, msg) do
52+
GenServer.call(queue, {:push, msg})
53+
end
54+
55+
def pop(queue, count) do
56+
GenServer.call(queue, {:pop, count})
57+
end
58+
59+
def length(queue) do
60+
GenServer.call(queue, :length)
61+
end
62+
63+
def handle_call(:length, _from, state) do
64+
{:reply, Enum.count(state.queue), state}
65+
end
66+
67+
def handle_call({:push, msg}, from, state) do
68+
if Enum.count(state.queue) >= state.max_length do
69+
waiting = state.waiting ++ [{from, msg}]
70+
{:noreply, %{state | waiting: waiting}}
71+
else
72+
queue = state.queue ++ [msg]
73+
{:reply, :ok, %{state | queue: queue}}
74+
end
75+
end
76+
77+
def handle_call({:pop, count}, _from, state) do
78+
{popped, remaining} = Enum.split(state.queue, count)
79+
{popped_waiters, still_waiting} = Enum.split(state.waiting, count)
80+
81+
msgs_from_waiters = Enum.map(popped_waiters, fn {from, msg} ->
82+
GenServer.reply(from, :ok)
83+
msg
84+
end)
85+
86+
queue = remaining ++ msgs_from_waiters
87+
88+
{:reply, popped, %{state | queue: queue, waiting: still_waiting}}
89+
end
90+
end
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
defmodule Queutils.BlockingQueueProducer do
2+
use GenStage
3+
require Logger
4+
5+
@moduledoc """
6+
A `GenStage` producer that polls a `Queutils.BlockingQueue` at a fixed interval,
7+
emitting any events on the queue.
8+
9+
## Usage
10+
11+
Add it to your application supervisor's `start/2` function, after the queue it pulls from, like this:
12+
13+
def start(_type, _args) do
14+
children = [
15+
...
16+
{Queutils.BlockingQueue, name: MessageQueue, max_length: 10_000},
17+
{Queutils.BlockingQueueProducer, name: MessageProducer},
18+
...
19+
]
20+
21+
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
22+
Supervisor.start_link(children, opts)
23+
end
24+
25+
The subscribe a consumer to it, like any other `GenStage` producer.
26+
27+
def init(_opts) do
28+
{:consumer, :my_consumer_state, [subscribe_to: MessageProducer]}
29+
end
30+
31+
## Options
32+
33+
- `:name` - the ID of the queue. This will be the first argument to the `push/2` function. Default is `BlockingProducer`.
34+
- `:max_length` - The maximum number of messages that this process will store until it starts blocking. Default is 1,000.
35+
- `:dispatcher` - The `GenStage` dispatcher that this producer should use. Default is `GenStage.DemandDispatcher`.
36+
"""
37+
38+
def start_link(opts) do
39+
name = Keyword.get(opts, :name, BlockingQueueProducer)
40+
GenStage.start_link(__MODULE__, opts, name: name)
41+
end
42+
43+
def child_spec(opts) do
44+
%{
45+
id: Keyword.get(opts, :name, BlockingQueueProducer),
46+
start: {__MODULE__, :start_link, [opts]},
47+
type: :supervisor
48+
}
49+
end
50+
51+
@impl true
52+
def init(opts) do
53+
poll_interval = Keyword.get(opts, :poll_interval, 250)
54+
dispatcher = Keyword.get(opts, :dispatcher, GenStage.DemandDispatcher)
55+
queue = Keyword.get(opts, :queue, BlockingQueue)
56+
Process.send_after(self(), :poll, poll_interval)
57+
{:producer, %{queue: queue, demand: 0, poll_interval: poll_interval}, dispatcher: dispatcher}
58+
end
59+
60+
@impl true
61+
def handle_info(:poll, state) do
62+
events = Queutils.BlockingQueue.pop(state.queue, state.demand)
63+
remaining_demand = state.demand - Enum.count(events)
64+
65+
Process.send_after(self(), :poll, state.poll_interval)
66+
{:noreply, events, %{state | demand: remaining_demand}}
67+
end
68+
69+
@impl true
70+
def handle_demand(demand, state) do
71+
total_demand = demand + state.demand
72+
events = Queutils.BlockingQueue.pop(state.queue, total_demand)
73+
remaining_demand = total_demand - Enum.count(events)
74+
{:noreply, events, %{state | demand: remaining_demand}}
75+
end
76+
end

mix.exs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
defmodule Queutils.MixProject do
2+
use Mix.Project
3+
4+
def project do
5+
[
6+
app: :queutils,
7+
version: "1.0.0",
8+
elixir: "~> 1.9",
9+
start_permanent: Mix.env() == :prod,
10+
deps: deps(),
11+
description: "Handy little queues and producers.",
12+
source_url: "https://github.com/cantido/queutils",
13+
homepage_url: "https://github.com/cantido/queutils",
14+
package: [
15+
maintainers: ["Rosa Richter"],
16+
licenses: ["GPL-3.0-or-later"],
17+
links: %{"GitHub" => "https://github.com/cantido/queutils"},
18+
],
19+
docs: [
20+
main: "Queutils",
21+
source_url: "https://github.com/cantido/queutils",
22+
extras: [
23+
"README.md"
24+
]
25+
]
26+
]
27+
end
28+
29+
# Run "mix help compile.app" to learn about applications.
30+
def application do
31+
[
32+
extra_applications: [:logger]
33+
]
34+
end
35+
36+
# Run "mix help deps" to learn about dependencies.
37+
defp deps do
38+
[
39+
{:gen_stage, "~> 1.0"}
40+
]
41+
end
42+
end

mix.lock

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
%{
2+
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm"},
3+
}

0 commit comments

Comments
 (0)