Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# HLS with Elixir ```elixir Mix.install([ {:plug, "~> 1.16"}, {:ex_cmd, "~> 0.12.0"}, {:evision, "~> 0.2.7"}, {:file_system, "~> 1.0"}, {:plug_crypto, "~> 2.1"}, {:bandit, "~> 1.5"}, {:kino, "~> 0.13.2"}, {:corsica, "~> 2.1"}, {:req, "~> 0.5.2"} ]) ``` ## Create a WebServer listening on port 4001 We need to serve the segments and the playlist for the browser. > Note that we need to set "CORS" on this server. ```elixir defmodule WebServer do use Plug.Router plug Corsica, origins: "*" plug :match plug :dispatch # http://localhost:58331 get "/hls/:file" do IO.puts "endpoint reached----" %{"file" => file} = conn.params data = File.read!("./priv/hls/"<>file) Plug.Conn.send_resp(conn, 200, data) end end Bandit.start_link(plug: WebServer, port: 4001) ``` ## FFmpeg processes We run two `FFmpeg` processes as "keep alive" with `ExCmd`. The first one will extract all the frames from the received video chunk. The second one will build HLS segments and the playlist. ```elixir defmodule FFmpegProcessor do @moduledoc false @ffmpeg System.find_executable("ffmpeg") || "/opt/homebrew/bin/ffmpeg" def start(frame_rate, resolution, duration) do frame_pattern = "./priv/input/test_%05d.jpg" build_frames = ~w(#{@ffmpeg} -loglevel debug -i pipe:0 -framerate #{frame_rate} -video_size #{resolution} -thread_type slice #{frame_pattern} ) {:ok, pid_capture} = ExCmd.Process.start_link(build_frames) playlist = Path.join("./priv/hls", "playlist.m3u8") segment = Path.join("./priv/hls", "segment_%03d.ts") ffmpeg_rebuild_cmd = ~w(#{@ffmpeg} -loglevel info -f image2pipe -framerate #{frame_rate} -i pipe:0 -c:v libx264 -preset veryfast -f hls -hls_time #{duration} -hls_list_size 4 -hls_playlist_type event -hls_flags append_list -hls_segment_filename #{segment} #{playlist} ) {:ok, pid_segment} = ExCmd.Process.start_link(ffmpeg_rebuild_cmd) {pid_capture, pid_segment} end end ################################################################################## # ffmpeg [GeneralOptions] [InputFileOptions] -i input [OutputFileOptions] output # ################################################################################## ``` ## File watcher This module will monitor changes in the file system in the given directory. We want to know when `FFmpeg` has built the "playlist.m3u8" file located in the directory ".priv/hls/" (we decided to put it there, as set in the previous module). When this event is detected, we send a message to the caller. ```elixir defmodule FileWatcher do use GenServer require Logger @impl true def init(ws_pid) do {:ok, watcher_pid} = FileSystem.start_link(dirs: ["./priv/hls"]) FileSystem.subscribe(watcher_pid) {:ok, %{watcher_pid: watcher_pid, ws_pid: ws_pid}} end @impl true def handle_info( {:file_event, watcher_pid, {path, _}}, %{watcher_pid: watcher_pid, ws_pid: ws_pid} = state ) do Logger.debug("File created: #{path}") if Path.extname(path) == ".m3u8", do: send(ws_pid, :playlist_created) {:noreply, state} end end ``` ## Evision running the Haar Cascade face detection We transform each frame by adding a rectangle around the ROI, if any. We read the files and build new files. > you will see false positives. ```elixir defmodule ImageProcessor do def load_haar_cascade do haar_path = Path.join( :code.priv_dir(:evision), "share/opencv4/haarcascades/haarcascade_frontalface_default.xml" ) Evision.CascadeClassifier.cascadeClassifier(haar_path) end def detect_and_draw_faces(file, face_detector) do input_path = Path.join("./priv/input", file) output_path = Path.join("./priv/output", file) frame = Evision.imread(input_path) # convert to grey-scale grey_img = Evision.cvtColor(frame, Evision.ImreadModes.cv_IMREAD_GRAYSCALE()) # detect faces faces = Evision.CascadeClassifier.detectMultiScale(face_detector, grey_img) # draw rectangles found on the original frame result_frame = Enum.reduce(faces, frame, fn {x, y, w, h}, mat -> Evision.rectangle(mat, {x, y}, {x + w, y + h}, {0, 255, 0}, thickness: 2) end) Evision.imwrite(output_path, result_frame) :ok = File.rm!(input_path) end end ``` ## The main process: Kino.JS.Live We use `Kino.JS.Live`. It runs a GenServer to handle the messages between the browser and the backend. The API is close to a LiveView and Channel. Instead of a `socket`, we have a `context` object. In the browser, we send a message with `ctx.pushEvent`. In the backend, the corresponding callback is a `handle_event`. We send a message from the backend with `broadcast_event`. In the browser, the listener is `ctx.handleEvent`. With `Kino.JS`, you load the HTML by passing the HTML string to `ctx.root.innerHTML`. You load external libraries with `ctx.importJS`. > We send **binary payloads** from the browser to the process. A helper module ```elixir defmodule Assets do def fetch_js do github_js_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.js" Req.get!(github_js_url).body end def fetch_html do github_html_url = "https://raw.githubusercontent.com/dwyl/hls-demo/main/lib/assets/index.html" Req.get!(github_html_url).body end end ``` ```elixir defmodule HlsLive do use Kino.JS use Kino.JS.Live @duration 5 @frame_rate 30 @resolution "640x480" @html Assets.fetch_html() def run(), do: Kino.JS.Live.new(__MODULE__, @html) asset "main.js" do Assets.fetch_js() end @impl true def init(html, ctx) do ["./priv/input", "./priv/output", "./priv/hls"] |> Enum.each(&File.mkdir_p/1) {:ok, watcher_pid} = GenServer.start(FileWatcher, self()) {pid_capture, pid_segment} = FFmpegProcessor.start(@frame_rate, @resolution, @duration) ctx = ctx |> assign(%{ html: html, face_detector: ImageProcessor.load_haar_cascade(), pid_capture: pid_capture, pid_segment: pid_segment, pid_watcher: watcher_pid, map_list: MapSet.new(), queue: :queue.new(), frame_rate: @frame_rate, chunk_id: 1, ref: nil, init: true }) {:ok, ctx} end @impl true def handle_connect(ctx) do {:ok, ctx.assigns.html, ctx} end # received from the browser------------- @impl true def handle_event("stop", _, ctx) do {:noreply, ctx} end def handle_event("chunk", {:binary,_, buffer}, ctx) do %{pid_capture: pid_capture, chunk_id: chunk_id} = ctx.assigns IO.puts("received data ---------------#{ctx.assigns.chunk_id}") # Write the received binary data to the FFmpeg capture process :ok = ExCmd.Process.write(pid_capture, buffer) send(self(), :ffmpeg_process) ctx = assign(ctx, chunk_id: chunk_id + 1) {:noreply, ctx} end # received from the server-------------- @impl true def handle_info(:ffmpeg_process, ctx) do %{queue: queue, map_list: map_list} = ctx.assigns case File.ls!("./priv/input") do [] -> {:noreply, ctx} files -> new_files = MapSet.difference(MapSet.new(files), map_list) MapSet.size(new_files) |> IO.inspect(label: "NEW FILES") new_queue = :queue.in(MapSet.to_list(new_files), queue) map_list = MapSet.union(new_files, map_list) MapSet.size(map_list) |> IO.inspect(label: "MAP LIST") send(self(), :process_queue) ctx = ctx |> assign(queue: new_queue) |> assign(map_list: map_list) {:noreply, ctx} end end def handle_info(:process_queue, ctx) do %{queue: queue, face_detector: face_detector} = ctx.assigns case :queue.out(queue) do {{:value, files}, new_queue} -> :ok = Task.async_stream( files, fn file -> :ok = ImageProcessor.detect_and_draw_faces(file, face_detector) end, max_concurreny: System.schedulers_online(), ordered: false ) |> Stream.run() send(self(), :process_queue) {:noreply, assign(ctx, queue: new_queue)} {:empty, _} -> send(self(), :ffmpeg_rebuild) {:noreply, assign(ctx, queue: :queue.new())} end end def handle_info(:ffmpeg_rebuild, %{assigns: %{chunk_id: @duration}} = ctx) do IO.puts("REBUILD---") %{map_list: map_list, pid_segment: pid_segment} = ctx.assigns list = MapSet.to_list(map_list) |> Enum.sort() %{ref: ref} = Task.async(fn -> for file <- list do ExCmd.Process.write(pid_segment, File.read!(Path.join("./priv/output", file))) end Enum.each(list, &File.rm(Path.join("./priv/output", &1))) end) ctx = ctx |> assign(map_list: MapSet.new()) |> assign(chunk_id: 0) |> assign(ref: ref) {:noreply, ctx} end def handle_info(:ffmpeg_rebuild, ctx) do {:noreply, ctx} end def handle_info(:playlist_created, %{assigns: %{init: true}} = ctx) do IO.puts("PLAYLIST CREATED") broadcast_event(ctx, "playlist_ready", %{}) {:noreply, assign(ctx, init: false)} end def handle_info(msg, ctx) do {:noreply, ctx} end @impl true def terminate(_, _) do {:stop, :shutdown, :normal} end end ``` ## The output We will see your webcam displayed. Click on "start". After 15s, you should see below a second video element which streams the transformed feed of the webcam with face detection. Et voilà!. ```elixir HlsLive.run() ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×