How to Do Database Query with SQLX in Rust
Introduction
- What is SQLx?
- Key features of SQLx
Setting Up Your Rust Project
> cargo new --bin sqlx-emaple
Creating binary (application) `sqlx-emaple` package
note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
- Adding dependencies (sqlx, tokio, etc.)
[dependencies]
anyhow = "1.0.92"
serde = { version = "1.0.214", features = ["derive"] }
chrono = { version = "0.4.38", features = ["serde"] }
sqlx = { version = "0.8.2", features = [
"chrono",
"postgres",
"runtime-tokio",
"tls-rustls",
] }
thiserror = "2.0.1"
tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
- Setting up a local database instance
~> createdb my_todo_list --template=template0
~> psql -U db_manager -d my_todo_list
psql (14.12 (Homebrew))
Type "help" for help.
my_todo_list=> \q
~> psql -U db_manager -d my_todo_list
psql (14.12 (Homebrew))
Type "help" for help.
my_todo_list=> \dt
Did not find any relations.
my_todo_list=> \conninfo
You are connected to database "my_todo_list" as user "db_manager" via socket in "/tmp" at port "5432".
my_todo_list=>
- Configuring the connection string
sqlx-emaple (master)> touch .env
# set this env data in .env file
DATABASE_URL = "postgres://db_manager:super_admin8801@localhost:5432/my_todo_list"
Connecting to the Database
- Establishing a Connection
- Using
sqlx::Pool
to manage connections
use sqlx::PgPool;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Hello, world!");
let pool = PgPool::connect("postgres://db_manager:xxxx@localhost:5432/my_todo_list")
.await?;
println!("connected database: {:?}", pool);
Ok(())
}
sqlx-emaple (master)> cargo run .
Hello, world!
connected database: Pool { size: 1, num_idle: 1, is_closed: false, options: PoolOptions { max_connections: 10, min_connections: 0, connect_timeout: 30s, max_lifetime: Some(1800s), idle_timeout: Some(600s), test_before_acquire: true } }
Create Table in DB
- install
sqlx-cli
> cargo install sqlx-cli
Finished `release` profile [optimized] target(s) in 1m 48s
Replacing ~/.cargo/bin/cargo-sqlx
Replacing ~/.cargo/bin/sqlx
Replaced package `sqlx-cli v0.7.4` with `sqlx-cli v0.8.2` (executables `cargo-sqlx`, `sqlx`)
- Sqlx Migrate
sqlx-emaple (master)> sqlx migrate add initial
Creating migrations/20241116140325_initial.sql
- Create Tables SQL
-- Add migration script here
CREATE TABLE IF NOT EXISTS users (
id bigserial PRIMARY KEY,
username VARCHAR(64) NOT NULL UNIQUE,
email VARCHAR(64) NOT NULL UNIQUE,
created_at timestamptz DEFAULT CURRENT_TIMESTAMP,
updated_at timestamptz DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS todos (
id bigserial PRIMARY KEY,
user_id bigint NOT NULL REFERENCES users(id),
title text NOT NULL,
description text NOT NULL DEFAULT '',
status SMALLINT NOT NULL DEFAULT 0,
created_at timestamptz DEFAULT CURRENT_TIMESTAMP,
updated_at timestamptz DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_user_created_at ON todos (user_id, created_at);
CREATE INDEX idx_user_updated_at ON todos (user_id, updated_at);
CREATE INDEX idx_user_title ON todos (user_id, title);
Apply SQL Migrations
sqlx-emaple (master)> sqlx migrate run
Applied 20241116140325/migrate initial (36.58975ms)
The table todos
and users
is been created.
my_todo_list=> \dt
List of relations
Schema | Name | Type | Owner
--------+------------------+-------+------------
public | _sqlx_migrations | table | db_manager
public | todos | table | db_manager
public | users | table | db_manager
(3 rows)
my_todo_list=> select * from todos;
id | user_id | title | description | status | created_at | updated_at
----+---------+-------+-------------+--------+------------+------------
(0 rows)
my_todo_list=> select * from users;
id | username | email | created_at | updated_at
----+----------+-------+------------+------------
(0 rows)
Set Models
Set the data model(struct
) for each table
sqlx-example/src/models.rs
When set model for a Row, use FromRow
macro to do map the Row to the Struct:
mapping Rows to Structs, Using #[derive(sqlx::FromRow)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
#[derive(Debug, FromRow, Serialize, Deserialize)]
pub struct Todo {
pub id: i64,
pub user_id: i64,
pub title: String,
pub description: String,
pub status: i16,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, FromRow, Serialize, Deserialize, PartialEq)]
pub struct User {
pub id: i64,
pub username: String,
pub email: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
Executing Queries
Create User
sqlx-example/src/user.rs
- Use
sqlx::query_as
to set the SQL that to create the User.
Execute a single SQL query as a prepared statement (transparently cached).Maps rows to Rust types using [
FromRow
].
- Use
bind
to set the values in the SQL. - Use
fetch_one
to get one row from the DB.
Execute the query, returning the first row or [
Error::RowNotFound
] otherwise.
use serde::Deserialize;
use sqlx::PgPool;
use crate::{errors::AppError, models::User};
#[derive(Debug, Deserialize)]
pub struct CreateUser {
pub username: String,
pub email: String,
}
pub async fn create_user(input: &CreateUser, pool: &PgPool) -> Result<User, AppError> {
let user = sqlx::query_as("INSERT INTO users(username, email) VALUES($1, $2) RETURNING id,username,email,created_at,updated_at")
.bind(&input.username)
.bind(&input.email)
.fetch_one(pool)
.await?;
Ok(user)
}
Get Uer by email and id
- Use
fetch_optional
to get oneOption<Row>
.
Execute the query, returning the first row or
None
otherwise.
pub async fn get_user_by_email(email: &str, pool: &PgPool) -> Result<Option<User>, AppError> {
let user = sqlx::query_as(
"SELECT id,username,email,created_at,updated_at FROM users WHERE email=$1",
)
.bind(email)
.fetch_optional(pool)
.await?;
Ok(user)
}
pub async fn get_user_by_id(id: i64, pool: &PgPool) -> Result<Option<User>, AppError> {
let user =
sqlx::query_as("SELECT id,username,email,created_at,updated_at FROM users WHERE id=$1")
.bind(id)
.fetch_optional(pool)
.await?;
Ok(user)
}
Create Todo
In sqlx-example/src/todolist.rs
use serde::Deserialize;
use sqlx::PgPool;
use crate::{errors::AppError, models::Todo};
#[derive(Debug, Deserialize)]
pub struct CreateTodo {
pub user_id: i64,
pub title: String,
pub description: String,
pub status: i16,
}
pub async fn create_todo(input: &CreateTodo, pool: &PgPool) -> Result<Todo, AppError> {
let todo = sqlx::query_as(
r#"INSERT INTO todos (user_id,title,description,status)
VALUES ($1,$2,$3,$4)
RETURNING id,user_id,title,description,status,created_at,updated_at;
"#,
)
.bind(input.user_id)
.bind(&input.title)
.bind(&input.description)
.bind(input.status)
.fetch_one(pool)
.await?;
Ok(todo)
}
List Todos
- Use
fetch_all
to get all the rows.
#[derive(Debug)]
pub struct GetTodosArgs {
pub user_id: i64,
pub offset: i64,
pub limit: i64,
pub status: Option<i16>,
}
pub async fn get_todos_by_user_id(
args: GetTodosArgs,
pool: &PgPool,
) -> Result<Vec<Todo>, AppError> {
let mut offset = args.offset;
if args.offset < 0 {
offset = 0;
}
let mut limit = args.limit;
if args.limit <= 0 || args.limit > 100 {
limit = 100
}
let mut status = 0 as i16;
if args.status.is_some() {
status = args.status.unwrap();
}
let todos = sqlx::query_as(
r#"
SELECT id,user_id,title,description,status,created_at,updated_at
FROM todos
WHERE user_id=$1 AND status=$2
ORDER BY id DESC
LIMIT $3
OFFSET $4;
"#,
)
.bind(args.user_id)
.bind(status)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
Ok(todos)
}
Update Todo Status
pub async fn update_todo_status(id: i64, status: i16, pool: &PgPool) -> Result<Todo, AppError> {
let todo = sqlx::query_as(
r#"
UPDATE todos SET status=$1 WHERE id=$2
RETURNING id,user_id,title,description,status,created_at,updated_at;
"#,
)
.bind(status)
.bind(id)
.fetch_one(pool)
.await?;
Ok(todo)
}
Qeury data with models in Main
sqlx-example/src/main.rs
use sqlx::PgPool;
use sqlx_emaple::{
errors::AppError,
models::{Todo, User},
todolist::{self, CreateTodo, GetTodosArgs},
user::{self, CreateUser},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Hello, world!");
let pool = PgPool::connect("postgres://db_manager:super_admin8801@localhost:5432/my_todo_list")
.await?;
println!("connected database: {:?}", pool);
let user_id = match create_user(&pool).await {
Ok(u) => {
println!("created user: {:?}", u);
u.id
}
Err(e) => {
println!("failed to create user: {}", e);
let u = user::get_user_by_id(1, &pool).await?.unwrap();
u.id
}
};
let todo = create_todo(&pool, user_id).await?;
println!("created todo: {:?}", todo);
let todos = get_todo_by_user_id(&pool, user_id).await?;
println!("get_todo_by_user_id: {:?}", todos);
Ok(())
}
async fn create_user(pool: &PgPool) -> Result<User, AppError> {
let user = CreateUser {
username: "Alice".to_string(),
email: "[email protected]".to_string(),
};
user::create_user(&user, pool).await
}
async fn create_todo(pool: &PgPool, user_id: i64) -> Result<Todo, AppError> {
let create_todo = CreateTodo {
user_id,
title: "Query Data with SQLX".to_string(),
description: "sqlx usage".to_string(),
status: 0,
};
todolist::create_todo(&create_todo, pool).await
}
async fn get_todo_by_user_id(pool: &PgPool, user_id: i64) -> Result<Vec<Todo>, AppError> {
todolist::get_todos_by_user_id(
GetTodosArgs {
user_id,
offset: 0,
limit: 100,
status: None,
},
pool,
)
.await
}
The data has been inserted in Postgres
my_todo_list=> select * from users;
id | username | email | created_at | updated_at
----+----------+----------------+-------------------------------+-------------------------------
1 | Alice | [email protected] | 2024-11-17 12:13:40.819274+08 | 2024-11-17 12:13:40.819274+08
(1 row)
my_todo_list=> select * from todos;
id | user_id | title | description | status | created_at | updated_at
----+---------+----------------------+-------------+--------+-------------------------------+-------------------------------
1 | 1 | Query Data with SQLX | sqlx usage | 0 | 2024-11-17 12:24:10.611699+08 | 2024-11-17 12:24:10.611699+08
2 | 1 | Query Data with SQLX | sqlx usage | 0 | 2024-11-17 12:27:07.354045+08 | 2024-11-17 12:27:07.354045+08
(2 rows)
sqlx-emaple (master)> cargo run .
Compiling sqlx-emaple v0.1.0 (~/sqlx-emaple)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.75s
Running `target/debug/sqlx-emaple .`
Hello, world!
connected database: Pool { size: 1, num_idle: 1, is_closed: false, options: PoolOptions { max_connections: 10, min_connections: 0, connect_timeout: 30s, max_lifetime: Some(1800s), idle_timeout: Some(600s), test_before_acquire: true } }
failed to create user: user email already been used: [email protected]
created todo: Todo { id: 2, user_id: 1, title: "Query Data with SQLX", description: "sqlx usage", status: 0, created_at: 2024-11-17T04:27:07.354045Z, updated_at: 2024-11-17T04:27:07.354045Z }
get_todo_by_user_id: [Todo { id: 2, user_id: 1, title: "Query Data with SQLX", description: "sqlx usage", status: 0, created_at: 2024-11-17T04:27:07.354045Z, updated_at: 2024-11-17T04:27:07.354045Z }, Todo { id: 1, user_id: 1, title: "Query Data with SQLX", description: "sqlx usage", status: 0, created_at: 2024-11-17T04:24:10.611699Z, updated_at: 2024-11-17T04:24:10.611699Z }]
Managing Transactions
- Starting and committing transactions
let mut tx = pool.begin().await?;
Set Model: Account
#[derive(Debug, Clone, FromRow, PartialEq)]
pub struct Account {
pub account_id: i64,
pub user_id: i64,
pub balance: BigDecimal,
pub currency: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub is_active: bool,
}
Create Account
sqlx-example/src/account.rs
use serde::Deserialize;
use sqlx::{PgPool, Postgres, Transaction};
use crate::{errors::AppError, models::Account};
#[derive(Debug, Deserialize)]
pub struct CreateAccount {
pub user_id: i64,
pub balance: f64,
pub currency: String,
pub is_active: bool,
}
pub async fn create_account(input: &CreateAccount, pool: &PgPool) -> Result<Account, AppError> {
let account = sqlx::query_as("INSERT INTO account(user_id, balance, currency, is_active) VALUES($1, $2, $3, $4) RETURNING account_id,user_id, balance, currency,created_at,updated_at,is_active")
.bind(&input.user_id)
.bind(&input.balance)
.bind(&input.currency)
.bind(&input.is_active)
.fetch_one(pool)
.await?;
Ok(account)
}
Transfer amount
to another Account
sqlx-example/src/main.rs
use sqlx::PgPool;
use sqlx_emaple::{
account::{self, CreateAccount},
errors::AppError,
models::{Account, Todo, User},
todolist::{self, CreateTodo, GetTodosArgs},
user::{self, CreateUser},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Hello, world!");
let pool = PgPool::connect("postgres://db_manager:super_admin8801@localhost:5432/my_todo_list")
.await?;
println!("connected database: {:?}", pool);
println!();
// let user1 = create_user(&pool, "Alex".to_string()).await?;
// let user2 = create_user(&pool, "Bob".to_string()).await?;
let account1 = create_account(&pool, 2 as i64, 1000.0 as f64).await?;
let account2 = create_account(&pool, 3 as i64, 1000.0 as f64).await?;
println!(
"transfer from account1: {:?}, to account2: {:?}",
account1, account2
);
transfer_from_to(
&pool,
account1.account_id,
account2.account_id,
100.0 as f64,
)
.await?;
Ok(())
}
transfer_from_to
sqlx-example/src/main.rs
async fn transfer_from_to(
pool: &PgPool,
from_user: i64,
to_user: i64,
amount: f64,
) -> Result<(), AppError> {
println!(
"start transaction transafer from: {} to: {}, with amount: {}",
from_user, to_user, amount
);
let mut tx = pool.begin().await?;
let from_account = account::withdrawal(from_user, amount, &mut tx).await?;
println!("withdrawal {} from acount: {:?}", amount, from_account);
let to_account = account::deposit(to_user, amount, &mut tx).await?;
println!("deposit {} from acount: {:?}", amount, to_account);
match tx.commit().await {
Ok(()) => {
println!("success transfer {} to {}", amount, to_account.account_id);
}
Err(e) => {
println!(
"transfer {} to {} failed: {:?}",
amount, to_account.account_id, e
);
}
}
Ok(())
}
Withdrawal
sqlx-example/src/account.rs
pub async fn withdrawal(
account_id: i64,
amount: f64,
tx: &mut Transaction<'static, Postgres>,
) -> Result<Account, AppError> {
// 1 lock thw account for update
let _ = sqlx::query("SELECT * FROM account WHERE account_id=$1 FOR UPDATE")
.bind(account_id)
.fetch_one(&mut **tx)
.await?;
// 2 update account balance
let now: DateTime<Utc> = Utc::now();
let account = sqlx::query_as(
"UPDATE account SET balance=balance - $1, updated_at=$2
WHERE account_id=$3
RETURNING account_id,user_id, balance, currency,created_at,updated_at,is_active",
)
.bind(amount)
.bind(now)
.bind(account_id)
.fetch_one(&mut **tx)
.await?;
Ok(account)
}
Deposit
sqlx-example/src/account.rs
pub async fn deposit<'a>(
account_id: i64,
amount: f64,
tx: &mut Transaction<'static, Postgres>,
) -> Result<Account, AppError> {
// 1 lock thw account for update
let _ = sqlx::query("SELECT * FROM account WHERE account_id=$1 FOR UPDATE")
.bind(account_id)
.execute(&mut **tx)
.await?;
// 2 update account balance
let now: DateTime<Utc> = Utc::now();
let account = sqlx::query_as(
"UPDATE account SET balance=balance + $1, updated_at=$2
WHERE account_id=$3
RETURNING account_id,user_id, balance, currency,created_at,updated_at,is_active",
)
.bind(amount)
.bind(now)
.bind(account_id)
.fetch_one(&mut **tx)
.await?;
Ok(account)
}
Run Transaction
sqlx-emaple (master)> cargo run .
warning: `sqlx-emaple` (bin "sqlx-emaple") generated 3 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.43s
Running `target/debug/sqlx-emaple .`
Hello, world!
connected database: Pool { size: 1, num_idle: 1, is_closed: false, options: PoolOptions { max_connections: 10, min_connections: 0, connect_timeout: 30s, max_lifetime: Some(1800s), idle_timeout: Some(600s), test_before_acquire: true } }
transfer from account1: Account { account_id: 4, user_id: 2, balance: BigDecimal(sign=Plus, scale=0, digits=[1000]), currency: "USD", created_at: 2024-11-17T09:13:07.588104Z, updated_at: 2024-11-17T09:13:07.588104Z, is_active: true }, to account2: Account { account_id: 5, user_id: 3, balance: BigDecimal(sign=Plus, scale=0, digits=[1000]), currency: "USD", created_at: 2024-11-17T09:13:07.617288Z, updated_at: 2024-11-17T09:13:07.617288Z, is_active: true }
start transaction transafer from: 4 to: 5, with amount: 100
withdrawal 100 from acount: Account { account_id: 4, user_id: 2, balance: BigDecimal(sign=Plus, scale=0, digits=[900]), currency: "USD", created_at: 2024-11-17T09:13:07.588104Z, updated_at: 2024-11-17T09:13:07.622722Z, is_active: true }
deposit 100 from acount: Account { account_id: 5, user_id: 3, balance: BigDecimal(sign=Plus, scale=0, digits=[1100]), currency: "USD", created_at: 2024-11-17T09:13:07.617288Z, updated_at: 2024-11-17T09:13:07.629311Z, is_active: true }
success transfer 100 to 5
my_todo_list=> select * from account;
account_id | user_id | balance | currency | created_at | updated_at | is_active
------------+---------+---------+----------+-------------------------------+-------------------------------+-----------
4 | 2 | 900.00 | USD | 2024-11-17 17:13:07.588104+08 | 2024-11-17 17:13:07.622722+08 | t
5 | 3 | 1100.00 | USD | 2024-11-17 17:13:07.617288+08 | 2024-11-17 17:13:07.629311+08 | t
(5 rows)
Complex Queries
Batch Insert Rows
bulk_create_todos
sqlx-example/src/models/todolist.rs
pub async fn bulk_create_todos(pool: &PgPool, todos: &[CreateTodo]) -> Result<(), sqlx::Error> {
let user_ids: Vec<i64> = todos.iter().map(|s| s.user_id).collect();
let titles: Vec<String> = todos.iter().map(|s| s.title.clone()).collect();
let descs: Vec<String> = todos.iter().map(|s| s.description.clone()).collect();
let status: Vec<i16> = todos.iter().map(|s| s.status).collect();
sqlx::query!(
r#"INSERT INTO todos (user_id,title,description,status)
SELECT * FROM UNNEST($1::bigint[], $2::text[], $3::text[], $4::smallint[])"#,
&user_ids[..],
&titles[..],
&descs[..],
&status[..],
)
.execute(pool)
.await?;
Ok(())
}
bulk insert main
sqlx-example/examples/bulk_insert.rs
use sqlx::PgPool;
use sqlx_example::todolist::{self, CreateTodo};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Bulk Insert example");
let pool = PgPool::connect("postgres://db_manager:xxx@localhost:5432/my_todo_list").await?;
println!("connected database: {:?}", pool);
println!();
let todos = vec![
CreateTodo {
user_id: 1,
title: "learn axum".to_string(),
description: "axum".to_string(),
status: 0,
},
CreateTodo {
user_id: 2,
title: "learn sqlx".to_string(),
description: "sqlx".to_string(),
status: 0,
},
CreateTodo {
user_id: 1,
title: "learn react".to_string(),
description: "react".to_string(),
status: 0,
},
];
todolist::bulk_create_todos(&pool, &todos).await?;
println!("bulk create todos success!");
Ok(())
}
my_todo_list=> select * from todos order by created_at desc limit 3;
id | user_id | title | description | status | created_at | updated_at
----+---------+-------------+-------------+--------+-------------------------------+-------------------------------
3 | 1 | learn axum | axum | 0 | 2024-11-18 08:44:24.291281+08 | 2024-11-18 08:44:24.291281+08
4 | 2 | learn sqlx | sqlx | 0 | 2024-11-18 08:44:24.291281+08 | 2024-11-18 08:44:24.291281+08
5 | 1 | learn react | react | 0 | 2024-11-18 08:44:24.291281+08 | 2024-11-18 08:44:24.291281+08
(3 rows)