# Rate Limiting
```elixir
Mix.install([
{:kino, "~> 0.14.2"}
])
```
## Introduction
A *rate limiter* is a component that sits in front of a resource (or a pool of resources), and restricts the rate of requests to it. For the sake of simplicity, we will be working with a single resource in the form of a `Worker` genserver:
<!-- livebook:{"break_markdown":true} -->
```mermaid
graph LR;
RL[Rate Limiter];
C1[Client 1]-->RL;
C2[Client 2]-->RL;
C3[Client 3]-->RL;
RL-->W[Worker];
```
<!-- livebook:{"break_markdown":true} -->
The rate limiter needs to be informed about the maximum allowed rate. This can take a number of forms. Some of these are:
* A feedback mechanism allowing the resource to control the flow.
* A period length and a number of requests to allow during this period.
* A number of requests to allow during a fixed period.
* A minimum period in between requests.
For reasons of simplicity, I have chosen to go for the last option. In a real-world scenario, I would go for a pluggable strategy that can be decided at initialization time.
<!-- livebook:{"break_markdown":true} -->
### Design
<!-- livebook:{"break_markdown":true} -->
I have decided on a design where
1. Clients don't know about the rate limiter, but the workers do. Clients developers will thus only need to care about the interface of the worker.
2. The interface side of the worker calls a generic interface function on the rate limiter. This means that the rate limiters interface is independent of the functionality offered by the worker: Adding a service to the worker does not affect the rate limiter.
<!-- livebook:{"break_markdown":true} -->
For a cast message the design looks like this:
```mermaid
sequenceDiagram
Client-->>Worker: Specific interface function call
Worker-->>Rate Limiter: Generic interface function call
Rate Limiter->>Rate Limiter: cast
Rate Limiter->>Worker: cast
```
<!-- livebook:{"break_markdown":true} -->
Here, the role of the cast in the `Rate Limiter` is to queue the request so that the rate limiting can be implemented using the message inbox as queue. This is a volatile design choice.
<!-- livebook:{"break_markdown":true} -->
While this setup protects the worker, the client is oblivious to the rate limiting. At times that may be fine, but it does come with a risk: If the client produces work packages faster than the rate limit, then the inbox of the rate limiter will grow. This is the practical equivalent of a memory leak, and violates the clients expectation of near-immediate processing. Some sort of flow control (e.g., back pressure) is needed. Calls are immune to this issue as they are synchronized by design.
<!-- livebook:{"break_markdown":true} -->
For a call message the design looks like this:
```mermaid
sequenceDiagram
Client-->>Worker: Specific interface function call
Worker-->>Rate Limiter: Generic interface function call
Rate Limiter->>Rate Limiter: call
Rate Limiter->>Worker: call
Worker->>Client: response
```
<!-- livebook:{"break_markdown":true} -->
The interesting part of this setup is that the call being issued from the client side (at the top of the diagram) is being forwarded from the `Rate Limiter` to the `Worker`. The responsibility of responding is thus delegated to the `Worker` process. This has two implications:
1. The callback handler in the `Rate Limiter` is returning with a `:noreply` tuple.
2. The callback handler in the `Worker` process is mocking a respose to the client process by constructing and sending a `:"$gen_call"` message.
The effect of this is that initial call by the client process waits for the worker that it called by proxy of the rate limiter instead of the rate limiter itself.
<!-- livebook:{"break_markdown":true} -->
For demonstration purposes, printouts have been spattered over the code. This allows us to follow the order of execution and verify that the rate of the calls to the worker is limited.
## Define Client Pool Size
```elixir
kinos = [
Kino.Input.number("Client pool size:", default: 5),
Kino.Input.number("Rate limiter period:", default: 500),
]
Kino.Layout.grid(kinos)
```
```elixir
[client_pool_size, rate_limiter_period] =
kinos
|> Enum.map(fn kino -> kino |> Kino.Input.read() end)
```
## Implementation
```elixir
defmodule RateLimiter do
use GenServer
# (client) interface
def start_link(period) do
GenServer.start_link(__MODULE__, period, name: __MODULE__)
end
def start(period) do
GenServer.start(__MODULE__, period, name: __MODULE__)
end
def cast(server, payload) do
GenServer.cast(__MODULE__, {:cast, server, payload})
end
def call(server, payload) do
GenServer.call(__MODULE__, {:call, server, payload})
end
# callbacks
@impl true
def init(period) do
{:ok, period}
end
@impl true
def handle_cast({:cast, server, payload}, period = state) do
IO.puts("RateLimiter[cast]: #{inspect payload}")
GenServer.cast(server, payload)
:timer.sleep(period)
{:noreply, state}
end
@impl true
def handle_call({:call, server, payload}, from, period = state) do
IO.puts("RateLimiter[call]: #{inspect payload}")
GenServer.call(server, payload)
send(server, {:"$gen_call", from, payload})
:timer.sleep(period)
{:noreply, state}
end
end
```
```elixir
defmodule Worker do
use GenServer
# (client) interface
def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def start() do
GenServer.start(__MODULE__, nil, name: __MODULE__)
end
def process(description, client_name) do
# RateLimiter.cast(__MODULE__, {:process, description, client_name})
RateLimiter.call(__MODULE__, {:process, description, client_name})
end
# callbacks
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_cast({:process, description, client_name}, state) do
IO.puts("Worker[cast]: worker doing #{description} for #{client_name}")
{:noreply, state}
end
@impl true
def handle_call({:process, description, client_name} = message, _from, state) do
IO.puts("Worker[call]: worker doing #{description} for #{client_name}")
{:reply, {:ok, message}, state}
end
end
```
```elixir
defmodule Client do
use GenServer
# (client) interface
def start_link(name) do
GenServer.start_link(__MODULE__, name)
end
def start(name) do
GenServer.start(__MODULE__, name)
end
# callbacks
@impl true
def init(name) do
prime()
{:ok, %{name: name, index: 0}}
end
@impl true
def handle_cast({:work}, state) do
index = state.index
IO.puts("#{state.name}: sending package ##{index}")
result = Worker.process("Work package ##{index}", state.name)
IO.puts("#{state.name}: received result #{inspect result}")
prime()
{:noreply, Map.put(state, :index, index+1)}
end
# helpers
defp prime() do
GenServer.cast(self(), {:work})
end
end
```
## Demo
```elixir
{:ok, worker_pid} = Worker.start()
```
```elixir
{:ok, rl_pid} = RateLimiter.start(rate_limiter_period)
```
```elixir
client_pids =
1..client_pool_size
|> Enum.map(fn i ->
{:ok, pid} = Client.start("Client ##{i}")
pid
end)
```
Cleanup:
```elixir
client_pids
|> Enum.concat([worker_pid, rl_pid])
|> Enum.map(fn pid -> Process.exit(pid, :kill) end)
```
**Note:** In order to repeat the demo, you must execute all cells in the [demo](#demo).
## Discussion
This form of rate limiting is producer-driven: The producer decides on when to emit a value. The alternative is to use a consumer-driven design where the consumer side requests work to be emitted by the producer side. That is not always an option though. When it is an option, we should consider [GenStage](https://hexdocs.pm/gen_stage/GenStage.html) or some system that builds on it. Otherwise, one should consider relying on a queue.