Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# Intro to Timescale ```elixir Mix.install([ {:timescale, "~> 0.1.0"}, {:kino_db, "~> 0.2.0"}, {:jason, "~> 1.4"}, {:postgrex, "~> 0.16.5"}, {:kino_ecto, git: "https://github.com/vorce/kino_ecto"} ]) # Add https://github.com/vorce/kino_ecto ``` ## Building a health tracker Let's learn about using the `timescale` Elixir library for [TimescaleDB](https://www.timescale.com/). For our example, let's imagine we're building a fitness application that tracks your heart rate through a wearable device. Our application will receive (literal) heartbeats from the wearable device at varying intervals that we will record into the database. ## Setting up the Repo First and foremost, please ensure you have TimescaleDB installed on your machine using the [installation guide](https://docs.timescale.com/install/latest/self-hosted/). To go through these examples, you'll need the [Timescale Toolkit](https://docs.timescale.com/timescaledb/latest/how-to-guides/hyperfunctions/install-toolkit/) installed on your machine as well. The best way to install is with Docker, or if you're brave, build from source. Once Postgres/Timescale is up in running, you'll also need to set up secrets for connecting to a database. This will also require creating a Postgres database. Try the following commands in your shell: ```shell $ psql -c 'create database timescale_fitness' ``` <!-- livebook:{"break_markdown":true} --> You'll also need to set the following secrets in Livebook * `POSTGRES_HOST` - postgres hostname to connect to (default `localhost`) * `POSTGRES_USER` - postgres username to connect with (default `postgres`) * `POSTGRES_PASS` - postgres password to connect with (default `postgres`) * `POSTGRES_DATABASE` - database name (default `timescale_fitness`) ```elixir defmodule Fitness.Repo do use Ecto.Repo, otp_app: :fitness, adapter: Ecto.Adapters.Postgres def init(_type, config) do {:ok, Keyword.merge(config, migration_lock: :pg_advisory_lock, hostname: System.get_env("LB_POSTGRES_HOST", "localhost"), port: 5432, username: System.get_env("LB_POSTGRES_USER", "postgres"), password: System.get_env("LB_POSTGRES_PASSWORD", "postgres"), # If using Fly, enable IPv6 below # socket_options: [:inet6], database: System.get_env("LB_POSTGRES_DATABASE", "timescale_fitness") )} end # Helper function for LiveBook demonstrations. def migrate({num, migration}, direction) when is_atom(migration) do {:ok, _, _} = Ecto.Migrator.with_repo(__MODULE__, fn repo -> Ecto.Migrator.run(repo, [{num, migration}], direction, all: true) end) "Successfully Migrated #{inspect(migration)}" end end alias Fitness.Repo import Ecto.Query, except: [first: 2, last: 2] {:ok, repo_pid} = Kino.start_child(Repo) ``` <!-- livebook:{"attrs":{"database":"timescale_fitness","hostname":"localhost","password":"postgres","port":5432,"type":"postgres","use_ipv6":false,"username":"postgres","variable":"conn"},"kind":"Elixir.KinoDB.ConnectionCell","livebook_object":"smart_cell"} --> ```elixir opts = [ hostname: "localhost", port: 5432, username: "postgres", password: "postgres", database: "timescale_fitness" ] {:ok, conn} = Kino.start_child({Postgrex, opts}) ``` ## Creating our hypertable migration ```elixir defmodule Fitness.Repo.Migrations.CreateHeartbeat do use Ecto.Migration import Timescale.Migration def up do create_timescaledb_extension() create_if_not_exists table(:users) do add(:fullname, :string) end create_if_not_exists(unique_index(:users, [:fullname])) create_if_not_exists table(:heartbeats, primary_key: false) do add(:timestamp, :naive_datetime_usec, null: false) add(:user_id, references(:users), null: false) end create_hypertable(:heartbeats, :timestamp) end def down do drop(table("heartbeats"), mode: :cascade) drop(table("users"), mode: :cascade) drop_timescaledb_extension() end end Repo.migrate({0, Fitness.Repo.Migrations.CreateHeartbeat}, :up) ``` ## Generating mock data To facilitate our example, let's create three users who are tracking their heartbeats. ```elixir users = [ %{fullname: "Dave Lucia"}, %{fullname: "Alex Koutmos"}, %{fullname: "Peter Ullrich"} ] Repo.insert_all("users", users, on_conflict: :nothing) query = from(u in "users", order_by: [desc: u.fullname], select: %{id: u.id, fullname: u.fullname}) [%{id: peter_id} = peter, %{id: dave_id} = dave, %{id: alex_id} = alex] = Repo.all(query) ``` Next, we've built a little module to help us simulate heartbeats for an entire day. ```elixir defmodule Fitness.Generator do @ms_in_day :timer.hours(24) @doc """ Given a date, will generate a list of heartbeats for the day, represented as a list of maps with a `timestamp` field """ @spec generate_heartbeats(%{id: integer()}, Date.t()) :: list(%{timestamp: NaiveDateTime.t()}) def generate_heartbeats(user, day) do do_generate_heartbeats(user, [], NaiveDateTime.new!(day, ~T[00:00:00.000]), 0) end defp do_generate_heartbeats(user, heartbeats, day, ms) do # keep it between 60-200 beats per minute next = floor(:timer.minutes(1) / Enum.random(60..200)) + ms if next < @ms_in_day do heartbeat = %{timestamp: NaiveDateTime.add(day, next, :millisecond), user_id: user.id} do_generate_heartbeats(user, [heartbeat | heartbeats], day, next) else Enum.reverse(heartbeats) end end end # TODO try method in https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit batch_insert = fn heartbeats -> heartbeats |> Enum.chunk_every(100) |> Enum.map(fn chunk -> Repo.insert_all("heartbeats", chunk) end) end ``` <!-- livebook:{"branch_parent_index":3} --> ## Generate small dataset Next, we generate heartbeats for one user, and batch insert them into the database. ```elixir batch_insert.(Fitness.Generator.generate_heartbeats(alex, ~D[2022-09-22])) ``` ## Querying with Timescale Now that we have a dataset generated, let's try some queries using Timescale [hyperfunctions](https://docs.timescale.com/api/latest/hyperfunctions/). Let's start with the basics, and try to get the first and last values in our timeseries, using the [first](https://docs.timescale.com/api/latest/hyperfunctions/first/) and [last](https://docs.timescale.com/api/latest/hyperfunctions/last/) hyperfunctions. ```elixir import Timescale.Hyperfunctions Repo.all( from(h in "heartbeats", where: h.user_id == ^alex_id, select: {first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp)} ) ) ``` Ok, so not so interesting, but we can validate that our data starts roughly at the beginning of today and ends towards midnight. For more of a challenge, let's use the [time_bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) hyperfunction to calculate average BPM for each hour. First, let's explore how the `time_bucket` hyperfunction works. ```elixir Repo.all( from(h in "heartbeats", where: h.user_id == ^alex_id, select: {h.timestamp, time_bucket(h.timestamp, "1 second")}, limit: 5 ) ) ``` The first item in each tuple is the actual timestamp of the heartbeat, down to the microsecond. The second item is the result of `time_bucket/2` on the timestamp, bucketed down to the nearest second. `time_bucket/2` acts like the [floor/1](https://hexdocs.pm/elixir/Kernel.html#floor/1) math function, and as we'll see in a moment, enables further aggregation over time-series. ```elixir Repo.all( from(h in "heartbeats", where: h.user_id == ^alex_id, group_by: selected_as(:minute), select: %{ minute: selected_as(time_bucket(h.timestamp, "1 minute"), :minute), bpm: count(h) }, limit: 5 ) ) ``` <!-- livebook:{"branch_parent_index":3} --> ## Performance with large datasets So far, we've only taken a look at a relatively small dataset. ```elixir Repo.one(from(h in "heartbeats", select: count(h))) ``` Let's make this way more interesting by generating a months worth of heartbeats for our second user. As shown above, that should put us around 5 million rows of data. Go grab a soda, as this is gonna take a little while to execute. ```elixir for date <- Date.range(~D[2022-10-01], ~D[2022-10-31]) do batch_insert.(Fitness.Generator.generate_heartbeats(dave, date)) end ``` <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select * from approximate_row_count('heartbeats');","result_variable":"result3","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result3 = Postgrex.query!(conn, "select * from approximate_row_count('heartbeats');", []) ``` Now that we have a large dataset, let's retry our previous queries and see how they perform. <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;","result_variable":"result4","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result4 = Postgrex.query!( conn, "explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;", [] ) ``` Wow! They're still lightning fast. Timescale changes the query plan to make use of the hypertable architecture. Each chunk of our hypertable is aware of the time column, and can easily eliminate rows in bulk from our query plan. ## Compression One of the most compelling features of TimescaleDB is its ability to compress data. While data storage may be relatively cheap, the rate at which time-series data grows changes the equation a bit! <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size","result_variable":"result5","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result5 = Postgrex.query!( conn, "select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size", [] ) ``` ```elixir defmodule Fitness.Repo.Migrations.CompressHeartbeats do use Ecto.Migration import Timescale.Migration def up do enable_hypertable_compression(:heartbeats, segment_by: :user_id) add_compression_policy(:heartbeats, "7 days") end def down do execute("SELECT decompress_chunk(i, true) from show_chunks('heartbeats') i;") flush() remove_compression_policy(:test_hypertable, if_exists: true) disable_hypertable_compression(:heartbeats) end end Repo.migrate({1, Fitness.Repo.Migrations.CompressHeartbeats}, :up) ``` Now that we've compressed the table, we should observe that its size as been dramatically reduced. For some datasets, its common to see compression ratios of up to 96%. <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size","result_variable":"result6","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result6 = Postgrex.query!( conn, "select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size", [] ) ``` ## Continuous Aggregates As should be clear by now, aggregating data over time is extremely common when working with time-series data. Often, you'll want to compute aggregations across your dataset, grouped by various attributes such as your users, location, and of course, time. TimescaleDB has a feature called [Continuous Aggregates](https://docs.timescale.com/timescaledb/latest/how-to-guides/continuous-aggregates/) that dramatically improves the performance of expensive aggregations. It accomplishes this with some clever usage of [materialized views](https://www.postgresql.org/docs/current/rules-materializedviews.html), [triggers](https://www.postgresql.org/docs/15/trigger-definition.html), and hypertables, to produce views on data that update in real-time and query instaneously. Let's build a continuous aggregate for calcuating the min and max daily BPM for each user. <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"CREATE MATERIALIZED VIEW heartbeats_minutely\nWITH (timescaledb.continuous) AS\n SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id\n FROM heartbeats as h\n GROUP BY minute, user_id","result_variable":"result7","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result7 = Postgrex.query!( conn, """ CREATE MATERIALIZED VIEW heartbeats_minutely WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id FROM heartbeats as h GROUP BY minute, user_id """, [] ) ``` <!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"SELECT add_retention_policy('heartbeats', INTERVAL '1 year');","result_variable":"result8","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result8 = Postgrex.query!( conn, "SELECT add_retention_policy('heartbeats', INTERVAL '1 year');", [] ) ``` ## Filling gaps in time-series **Warning: this section requires the [TimescaleDB Toolkit](https://docs.timescale.com/timescaledb/latest/how-to-guides/hyperfunctions/install-toolkit/) to be installed, which may require building from source.** ```elixir defmodule Fitness.Repo.Migrations.AddTimescaleToolkit do use Ecto.Migration import Timescale.Migration def up do create_timescaledb_toolkit_extension() end def down do drop_timescaledb_toolkit_extension() end end Repo.migrate({3, Fitness.Repo.Migrations.AddTimescaleToolkit}, :up) ``` With any system, there are opportunities for the system to go down. This could be due to a datacenter outage, global DNS being attacked, or simply an application configuration mishap. Regardless, it is possible for gaps in the collection of our data, which poses problems for calculations and aggregations. Luckily, TimescaleDB provides hyperfunctions for gapfilling and interpoltion so smooth over these gaps. Let's add some heartbeats in for another user every second, skipping over a few. ```elixir Repo.insert_all("heartbeats", [ %{timestamp: ~N[2022-11-01 09:00:00], user_id: peter_id}, %{timestamp: ~N[2022-11-01 09:00:01], user_id: peter_id}, # No heartbeat at 9:00:02 %{timestamp: ~N[2022-11-01 09:00:03], user_id: peter_id}, # No heartbeat at 9:00:04 %{timestamp: ~N[2022-11-01 09:00:05], user_id: peter_id} ]) ``` Now we've got 4 heartbeats, with two missing at seconds 2 and 4. We can use the `time_bucket_gapfill/2` function to fill in heartbeats. ```elixir Repo.all( from(h in "heartbeats", where: h.user_id == ^peter_id, select: %{ second: selected_as(time_bucket_gapfill(h.timestamp, "1 second"), :second), user_id: h.user_id }, where: h.timestamp >= ^~N[2022-11-01 09:00:00] and h.timestamp <= ^~N[2022-11-01 09:00:05], group_by: [selected_as(:second), h.user_id] ) ) ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×