# HLS with Elixir
```elixir
Mix.install([
{:plug, "~> 1.16"},
{:ex_cmd, "~> 0.12.0"},
{:evision, "~> 0.2.7"},
{:file_system, "~> 1.0"},
{:plug_crypto, "~> 2.1"},
{:bandit, "~> 1.5"},
{:kino, "~> 0.13.2"},
{:corsica, "~> 2.1"},
{:req, "~> 0.5.2"}
])
```
## Create a WebServer listening on port 4001
We need to serve the segments and the playlist for the browser.
> Note that we need to set "CORS" on this server.
```elixir
defmodule WebServer do
use Plug.Router
plug Corsica, origins: "*"
plug :match
plug :dispatch
# http://localhost:58331
get "/hls/:file" do
IO.puts "endpoint reached----"
%{"file" => file} = conn.params
data = File.read!("./priv/hls/"<>file)
Plug.Conn.send_resp(conn, 200, data)
end
end
Bandit.start_link(plug: WebServer, port: 4001)
```
## FFmpeg processes
We run two `FFmpeg` processes as "keep alive" with `ExCmd`.
The first one will extract all the frames from the received video chunk.
The second one will build HLS segments and the playlist.
```elixir
defmodule FFmpegProcessor do
@moduledoc false
@ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg"
def start(frame_rate, resolution, duration) do
frame_pattern = "./priv/input/test_%05d.jpg"
build_frames =
~w(#{@ffmpeg} -loglevel debug
-i pipe:0 -framerate #{frame_rate}
-video_size #{resolution}
-thread_type slice
#{frame_pattern}
)
{:ok, pid_capture} =
ExCmd.Process.start_link(build_frames)
playlist = Path.join("./priv/hls", "playlist.m3u8")
segment = Path.join("./priv/hls", "segment_%03d.ts")
ffmpeg_rebuild_cmd =
~w(#{@ffmpeg} -loglevel info
-f image2pipe -framerate #{frame_rate}
-i pipe:0 -c:v libx264
-preset veryfast
-f hls
-hls_time #{duration}
-hls_list_size 4
-hls_playlist_type event
-hls_flags append_list
-hls_segment_filename #{segment}
#{playlist}
)
{:ok, pid_segment} =
ExCmd.Process.start_link(ffmpeg_rebuild_cmd)
{pid_capture, pid_segment}
end
end
##################################################################################
# ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output #
##################################################################################
```
## File watcher
This module will monitor changes in the file system in the given directory.
We want to know when `FFmpeg` has built the "playlist.m3u8" file located in the directory ".priv/hls/" (we decided to put it there, as set in the previous module).
When this event is detected, we send a message to the caller.
```elixir
defmodule FileWatcher do
use GenServer
require Logger
@impl true
def init(ws_pid) do
{:ok, watcher_pid} = FileSystem.start_link(dirs: ["./priv/hls"])
FileSystem.subscribe(watcher_pid)
{:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}}
end
@impl true
def handle_info(
{:file_event, watcher_pid, {path, _}},
%{watcher_pid: watcher_pid, ws_pid: ws_pid} = state
) do
Logger.debug("File created: #{path}")
if Path.extname(path) == ".m3u8", do: send(ws_pid, :playlist_created)
{:noreply, state}
end
end
```
## Evision running the Haar Cascade face detection
We transform each frame by adding a rectangle around the ROI, if any.
We read the files and build new files.
> you will see false positives.
```elixir
defmodule ImageProcessor do
def load_haar_cascade do
haar_path =
Path.join(
:code.priv_dir(:evision),
"share/opencv4/haarcascades/haarcascade_frontalface_default.xml"
)
Evision.CascadeClassifier.cascadeClassifier(haar_path)
end
def detect_and_draw_faces(file, face_detector) do
input_path = Path.join("./priv/input", file)
output_path = Path.join("./priv/output", file)
frame =
Evision.imread(input_path)
# convert to grey-scale
grey_img =
Evision.cvtColor(frame, Evision.ImreadModes.cv_IMREAD_GRAYSCALE())
# detect faces
faces =
Evision.CascadeClassifier.detectMultiScale(face_detector, grey_img)
# draw rectangles found on the original frame
result_frame =
Enum.reduce(faces, frame, fn {x, y, w, h}, mat ->
Evision.rectangle(mat, {x, y}, {x + w, y + h}, {0, 255, 0}, thickness: 2)
end)
Evision.imwrite(output_path, result_frame)
:ok = File.rm!(input_path)
end
end
```
## The main process: Kino.JS.Live
We use `Kino.JS.Live`. It runs a GenServer to handle the messages between the browser and the backend.
The API is close to a LiveView and Channel. Instead of a `socket`, we have a `context` object.
In the browser, we send a message with `ctx.pushEvent`. In the backend, the corresponding callback is a `handle_event`.
We send a message from the backend with `broadcast_event`. In the browser, the listener is `ctx.handleEvent`.
With `Kino.JS`, you load the HTML by passing the HTML string to `ctx.root.innerHTML`.
You load external libraries with `ctx.importJS`.
> We send **binary payloads** from the browser to the process.
A helper module
```elixir
defmodule Assets do
def fetch_js do
github_js_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.js"
Req.get!(github_js_url).body
end
def fetch_html do
github_html_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.html"
Req.get!(github_html_url).body
end
end
```
```elixir
defmodule HlsLive do
use Kino.JS
use Kino.JS.Live
@duration 5
@frame_rate 30
@resolution "640x480"
@html Assets.fetch_html()
def run(), do: Kino.JS.Live.new(__MODULE__, @html)
asset "main.js" do
Assets.fetch_js()
end
@impl true
def init(html, ctx) do
["./priv/input", "./priv/output", "./priv/hls"] |> Enum.each(&File.mkdir_p/1)
{:ok, watcher_pid} =
GenServer.start(FileWatcher, self())
{pid_capture, pid_segment} =
FFmpegProcessor.start(@frame_rate, @resolution, @duration)
ctx =
ctx
|> assign(%{
html: html,
face_detector: ImageProcessor.load_haar_cascade(),
pid_capture: pid_capture,
pid_segment: pid_segment,
pid_watcher: watcher_pid,
map_list: MapSet.new(),
queue: :queue.new(),
frame_rate: @frame_rate,
chunk_id: 1,
ref: nil,
init: true
})
{:ok, ctx}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns.html, ctx}
end
# received from the browser-------------
@impl true
def handle_event("stop", _, ctx) do
{:noreply, ctx}
end
def handle_event("chunk", {:binary,_, buffer}, ctx) do
%{pid_capture: pid_capture, chunk_id: chunk_id} = ctx.assigns
IO.puts("received data ---------------#{ctx.assigns.chunk_id}")
# Write the received binary data to the FFmpeg capture process
:ok = ExCmd.Process.write(pid_capture, buffer)
send(self(), :ffmpeg_process)
ctx = assign(ctx, chunk_id: chunk_id + 1)
{:noreply, ctx}
end
# received from the server--------------
@impl true
def handle_info(:ffmpeg_process, ctx) do
%{queue: queue, map_list: map_list} = ctx.assigns
case File.ls!("./priv/input") do
[] ->
{:noreply, ctx}
files ->
new_files =
MapSet.difference(MapSet.new(files), map_list)
MapSet.size(new_files) |> IO.inspect(label: "NEW FILES")
new_queue = :queue.in(MapSet.to_list(new_files), queue)
map_list = MapSet.union(new_files, map_list)
MapSet.size(map_list) |> IO.inspect(label: "MAP LIST")
send(self(), :process_queue)
ctx = ctx |> assign(queue: new_queue) |> assign(map_list: map_list)
{:noreply, ctx}
end
end
def handle_info(:process_queue, ctx) do
%{queue: queue, face_detector: face_detector} = ctx.assigns
case :queue.out(queue) do
{{:value, files}, new_queue} ->
:ok =
Task.async_stream(
files,
fn file ->
:ok = ImageProcessor.detect_and_draw_faces(file, face_detector)
end,
max_concurreny: System.schedulers_online(),
ordered: false
)
|> Stream.run()
send(self(), :process_queue)
{:noreply, assign(ctx, queue: new_queue)}
{:empty, _} ->
send(self(), :ffmpeg_rebuild)
{:noreply, assign(ctx, queue: :queue.new())}
end
end
def handle_info(:ffmpeg_rebuild, %{assigns: %{chunk_id: @duration}} = ctx) do
IO.puts("REBUILD---")
%{map_list: map_list, pid_segment: pid_segment} = ctx.assigns
list =
MapSet.to_list(map_list)
|> Enum.sort()
%{ref: ref} =
Task.async(fn ->
for file <- list do
ExCmd.Process.write(pid_segment, File.read!(Path.join("./priv/output", file)))
end
Enum.each(list, &File.rm(Path.join("./priv/output", &1)))
end)
ctx = ctx |> assign(map_list: MapSet.new()) |> assign(chunk_id: 0) |> assign(ref: ref)
{:noreply, ctx}
end
def handle_info(:ffmpeg_rebuild, ctx) do
{:noreply, ctx}
end
def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do
IO.puts("PLAYLIST CREATED")
broadcast_event(ctx, "playlist_ready", %{})
{:noreply, assign(ctx, init: false)}
end
def handle_info(msg, ctx) do
{:noreply, ctx}
end
@impl true
def terminate(_, _) do
{:stop, :shutdown, :normal}
end
end
```
## The output
We will see your webcam displayed.
Click on "start".
After 15s, you should see below a second video element which streams the transformed feed of the webcam with face detection. Et voilà!.
```elixir
HlsLive.run()
```