# Intro to Timescale
```elixir
Mix.install([
{:timescale, "~> 0.1.0"},
{:kino_db, "~> 0.2.0"},
{:jason, "~> 1.4"},
{:postgrex, "~> 0.16.5"},
{:kino_ecto, git: "https://github.com/vorce/kino_ecto"}
])
# Add https://github.com/vorce/kino_ecto
```
## Building a health tracker
Let's learn about using the `timescale` Elixir library for [TimescaleDB](https://www.timescale.com/).
For our example, let's imagine we're building a fitness application that tracks your heart rate through a wearable device. Our application will receive (literal) heartbeats from the wearable device at varying intervals that we will record into the database.
## Setting up the Repo
First and foremost, please ensure you have TimescaleDB installed on your machine using the [installation guide](https://docs.timescale.com/install/latest/self-hosted/). To go through these examples, you'll need the [Timescale Toolkit](https://docs.timescale.com/timescaledb/latest/how-to-guides/hyperfunctions/install-toolkit/) installed on your machine as well. The best way to install is with Docker, or if you're brave, build from source.
Once Postgres/Timescale is up in running, you'll also need to set up secrets for connecting to a database. This will also require creating a Postgres database. Try the following commands in your shell:
```shell
$ psql -c 'create database timescale_fitness'
```
<!-- livebook:{"break_markdown":true} -->
You'll also need to set the following secrets in Livebook
* `POSTGRES_HOST` - postgres hostname to connect to (default `localhost`)
* `POSTGRES_USER` - postgres username to connect with (default `postgres`)
* `POSTGRES_PASS` - postgres password to connect with (default `postgres`)
* `POSTGRES_DATABASE` - database name (default `timescale_fitness`)
```elixir
defmodule Fitness.Repo do
use Ecto.Repo,
otp_app: :fitness,
adapter: Ecto.Adapters.Postgres
def init(_type, config) do
{:ok,
Keyword.merge(config,
migration_lock: :pg_advisory_lock,
hostname: System.get_env("LB_POSTGRES_HOST", "localhost"),
port: 5432,
username: System.get_env("LB_POSTGRES_USER", "postgres"),
password: System.get_env("LB_POSTGRES_PASSWORD", "postgres"),
# If using Fly, enable IPv6 below
# socket_options: [:inet6],
database: System.get_env("LB_POSTGRES_DATABASE", "timescale_fitness")
)}
end
# Helper function for LiveBook demonstrations.
def migrate({num, migration}, direction) when is_atom(migration) do
{:ok, _, _} =
Ecto.Migrator.with_repo(__MODULE__, fn repo ->
Ecto.Migrator.run(repo, [{num, migration}], direction, all: true)
end)
"Successfully Migrated #{inspect(migration)}"
end
end
alias Fitness.Repo
import Ecto.Query, except: [first: 2, last: 2]
{:ok, repo_pid} = Kino.start_child(Repo)
```
<!-- livebook:{"attrs":{"database":"timescale_fitness","hostname":"localhost","password":"postgres","port":5432,"type":"postgres","use_ipv6":false,"username":"postgres","variable":"conn"},"kind":"Elixir.KinoDB.ConnectionCell","livebook_object":"smart_cell"} -->
```elixir
opts = [
hostname: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "timescale_fitness"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
```
## Creating our hypertable migration
```elixir
defmodule Fitness.Repo.Migrations.CreateHeartbeat do
use Ecto.Migration
import Timescale.Migration
def up do
create_timescaledb_extension()
create_if_not_exists table(:users) do
add(:fullname, :string)
end
create_if_not_exists(unique_index(:users, [:fullname]))
create_if_not_exists table(:heartbeats, primary_key: false) do
add(:timestamp, :naive_datetime_usec, null: false)
add(:user_id, references(:users), null: false)
end
create_hypertable(:heartbeats, :timestamp)
end
def down do
drop(table("heartbeats"), mode: :cascade)
drop(table("users"), mode: :cascade)
drop_timescaledb_extension()
end
end
Repo.migrate({0, Fitness.Repo.Migrations.CreateHeartbeat}, :up)
```
## Generating mock data
To facilitate our example, let's create three users who are tracking their heartbeats.
```elixir
users = [
%{fullname: "Dave Lucia"},
%{fullname: "Alex Koutmos"},
%{fullname: "Peter Ullrich"}
]
Repo.insert_all("users", users, on_conflict: :nothing)
query =
from(u in "users", order_by: [desc: u.fullname], select: %{id: u.id, fullname: u.fullname})
[%{id: peter_id} = peter, %{id: dave_id} = dave, %{id: alex_id} = alex] = Repo.all(query)
```
Next, we've built a little module to help us simulate heartbeats for an entire day.
```elixir
defmodule Fitness.Generator do
@ms_in_day :timer.hours(24)
@doc """
Given a date, will generate a list of heartbeats for the day,
represented as a list of maps with a `timestamp` field
"""
@spec generate_heartbeats(%{id: integer()}, Date.t()) :: list(%{timestamp: NaiveDateTime.t()})
def generate_heartbeats(user, day) do
do_generate_heartbeats(user, [], NaiveDateTime.new!(day, ~T[00:00:00.000]), 0)
end
defp do_generate_heartbeats(user, heartbeats, day, ms) do
# keep it between 60-200 beats per minute
next = floor(:timer.minutes(1) / Enum.random(60..200)) + ms
if next < @ms_in_day do
heartbeat = %{timestamp: NaiveDateTime.add(day, next, :millisecond), user_id: user.id}
do_generate_heartbeats(user, [heartbeat | heartbeats], day, next)
else
Enum.reverse(heartbeats)
end
end
end
# TODO try method in https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit
batch_insert = fn heartbeats ->
heartbeats
|> Enum.chunk_every(100)
|> Enum.map(fn chunk ->
Repo.insert_all("heartbeats", chunk)
end)
end
```
<!-- livebook:{"branch_parent_index":3} -->
## Generate small dataset
Next, we generate heartbeats for one user, and batch insert them into the database.
```elixir
batch_insert.(Fitness.Generator.generate_heartbeats(alex, ~D[2022-09-22]))
```
## Querying with Timescale
Now that we have a dataset generated, let's try some queries using Timescale [hyperfunctions](https://docs.timescale.com/api/latest/hyperfunctions/). Let's start with the basics, and try to get the first and last values in our timeseries, using the [first](https://docs.timescale.com/api/latest/hyperfunctions/first/) and [last](https://docs.timescale.com/api/latest/hyperfunctions/last/) hyperfunctions.
```elixir
import Timescale.Hyperfunctions
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
select: {first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp)}
)
)
```
Ok, so not so interesting, but we can validate that our data starts roughly at the beginning of today and ends towards midnight.
For more of a challenge, let's use the [time_bucket](https://docs.timescale.com/api/latest/hyperfunctions/time_bucket/) hyperfunction to calculate average BPM for each hour.
First, let's explore how the `time_bucket` hyperfunction works.
```elixir
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
select: {h.timestamp, time_bucket(h.timestamp, "1 second")},
limit: 5
)
)
```
The first item in each tuple is the actual timestamp of the heartbeat, down to the microsecond. The second item is the result of `time_bucket/2` on the timestamp, bucketed down to the nearest second. `time_bucket/2` acts like the [floor/1](https://hexdocs.pm/elixir/Kernel.html#floor/1) math function, and as we'll see in a moment, enables further aggregation over time-series.
```elixir
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^alex_id,
group_by: selected_as(:minute),
select: %{
minute: selected_as(time_bucket(h.timestamp, "1 minute"), :minute),
bpm: count(h)
},
limit: 5
)
)
```
<!-- livebook:{"branch_parent_index":3} -->
## Performance with large datasets
So far, we've only taken a look at a relatively small dataset.
```elixir
Repo.one(from(h in "heartbeats", select: count(h)))
```
Let's make this way more interesting by generating a months worth of heartbeats for our second user. As shown above, that should put us around 5 million rows of data. Go grab a soda, as this is gonna take a little while to execute.
```elixir
for date <- Date.range(~D[2022-10-01], ~D[2022-10-31]) do
batch_insert.(Fitness.Generator.generate_heartbeats(dave, date))
end
```
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select * from approximate_row_count('heartbeats');","result_variable":"result3","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result3 = Postgrex.query!(conn, "select * from approximate_row_count('heartbeats');", [])
```
Now that we have a large dataset, let's retry our previous queries and see how they perform.
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;","result_variable":"result4","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result4 =
Postgrex.query!(
conn,
"explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;",
[]
)
```
Wow! They're still lightning fast. Timescale changes the query plan to make use of the hypertable architecture. Each chunk of our hypertable is aware of the time column, and can easily eliminate rows in bulk from our query plan.
## Compression
One of the most compelling features of TimescaleDB is its ability to compress data. While data storage may be relatively cheap, the rate at which time-series data grows changes the equation a bit!
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size","result_variable":"result5","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result5 =
Postgrex.query!(
conn,
"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
[]
)
```
```elixir
defmodule Fitness.Repo.Migrations.CompressHeartbeats do
use Ecto.Migration
import Timescale.Migration
def up do
enable_hypertable_compression(:heartbeats, segment_by: :user_id)
add_compression_policy(:heartbeats, "7 days")
end
def down do
execute("SELECT decompress_chunk(i, true) from show_chunks('heartbeats') i;")
flush()
remove_compression_policy(:test_hypertable, if_exists: true)
disable_hypertable_compression(:heartbeats)
end
end
Repo.migrate({1, Fitness.Repo.Migrations.CompressHeartbeats}, :up)
```
Now that we've compressed the table, we should observe that its size as been dramatically reduced. For some datasets, its common to see compression ratios of up to 96%.
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size","result_variable":"result6","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result6 =
Postgrex.query!(
conn,
"select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
[]
)
```
## Continuous Aggregates
As should be clear by now, aggregating data over time is extremely common when working with time-series data. Often, you'll want to compute aggregations across your dataset, grouped by various attributes such as your users, location, and of course, time.
TimescaleDB has a feature called [Continuous Aggregates](https://docs.timescale.com/timescaledb/latest/how-to-guides/continuous-aggregates/) that dramatically improves the performance of expensive aggregations. It accomplishes this with some clever usage of [materialized views](https://www.postgresql.org/docs/current/rules-materializedviews.html), [triggers](https://www.postgresql.org/docs/15/trigger-definition.html), and hypertables, to produce views on data that update in real-time and query instaneously.
Let's build a continuous aggregate for calcuating the min and max daily BPM for each user.
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"CREATE MATERIALIZED VIEW heartbeats_minutely\nWITH (timescaledb.continuous) AS\n SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id\n FROM heartbeats as h\n GROUP BY minute, user_id","result_variable":"result7","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result7 =
Postgrex.query!(
conn,
"""
CREATE MATERIALIZED VIEW heartbeats_minutely
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id
FROM heartbeats as h
GROUP BY minute, user_id
""",
[]
)
```
<!-- livebook:{"attrs":{"cache_query":true,"connection":{"type":"postgres","variable":"conn"},"query":"SELECT add_retention_policy('heartbeats', INTERVAL '1 year');","result_variable":"result8","timeout":null},"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} -->
```elixir
result8 =
Postgrex.query!(
conn,
"SELECT add_retention_policy('heartbeats', INTERVAL '1 year');",
[]
)
```
## Filling gaps in time-series
**Warning: this section requires the [TimescaleDB Toolkit](https://docs.timescale.com/timescaledb/latest/how-to-guides/hyperfunctions/install-toolkit/) to be installed, which may require building from source.**
```elixir
defmodule Fitness.Repo.Migrations.AddTimescaleToolkit do
use Ecto.Migration
import Timescale.Migration
def up do
create_timescaledb_toolkit_extension()
end
def down do
drop_timescaledb_toolkit_extension()
end
end
Repo.migrate({3, Fitness.Repo.Migrations.AddTimescaleToolkit}, :up)
```
With any system, there are opportunities for the system to go down. This could be due to a datacenter outage, global DNS being attacked, or simply an application configuration mishap. Regardless, it is possible for gaps in the collection of our data, which poses problems for calculations and aggregations.
Luckily, TimescaleDB provides hyperfunctions for gapfilling and interpoltion so smooth over these gaps.
Let's add some heartbeats in for another user every second, skipping over a few.
```elixir
Repo.insert_all("heartbeats", [
%{timestamp: ~N[2022-11-01 09:00:00], user_id: peter_id},
%{timestamp: ~N[2022-11-01 09:00:01], user_id: peter_id},
# No heartbeat at 9:00:02
%{timestamp: ~N[2022-11-01 09:00:03], user_id: peter_id},
# No heartbeat at 9:00:04
%{timestamp: ~N[2022-11-01 09:00:05], user_id: peter_id}
])
```
Now we've got 4 heartbeats, with two missing at seconds 2 and 4. We can use the `time_bucket_gapfill/2` function to fill in heartbeats.
```elixir
Repo.all(
from(h in "heartbeats",
where: h.user_id == ^peter_id,
select: %{
second: selected_as(time_bucket_gapfill(h.timestamp, "1 second"), :second),
user_id: h.user_id
},
where:
h.timestamp >= ^~N[2022-11-01 09:00:00] and
h.timestamp <= ^~N[2022-11-01 09:00:05],
group_by: [selected_as(:second), h.user_id]
)
)
```