From 41072b91d1421226903b823943f20eb1a8fc1123 Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Wed, 1 May 2024 16:51:07 +0300 Subject: [PATCH] some changes? --- build.rs | 2 +- db/001_initial.sql | 4 - db/002_subscribers.sql | 12 ++ src/main.rs | 108 ++++-------- src/repo.rs | 364 +++++++++++++++++++++++++++++------------ 5 files changed, 305 insertions(+), 185 deletions(-) create mode 100644 db/002_subscribers.sql diff --git a/build.rs b/build.rs index 1ba960d..fa15e7f 100644 --- a/build.rs +++ b/build.rs @@ -32,7 +32,7 @@ fn main() { } fn get_value_from_env(key: &str) -> Option { - env::var(key).map_or_else(|_| None, |v| Some(v)) + env::var(key).map_or_else(|_| None, Some) } fn get_value_from_command, S: AsRef>( diff --git a/db/001_initial.sql b/db/001_initial.sql index 2ef6d10..24f8800 100644 --- a/db/001_initial.sql +++ b/db/001_initial.sql @@ -1,5 +1,3 @@ -BEGIN; - CREATE TABLE IF NOT EXISTS users ( `user_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT, `chat_id` integer NOT NULL UNIQUE, @@ -29,8 +27,6 @@ CREATE TABLE IF NOT EXISTS actions ( ON DELETE CASCADE ); -COMMIT; - -- drop index actions_action_id_user_id_idx; -- drop table users; -- drop table parameters; diff --git a/db/002_subscribers.sql b/db/002_subscribers.sql new file mode 100644 index 0000000..41d9241 --- /dev/null +++ b/db/002_subscribers.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS `subscribers` ( + `subscriber_id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + `user_id` INTEGER NOT NULL, + `kind` VARCHAR(16) NOT NULL, + `arguments` JSON NOT NULL DEFAULT '{}', + FOREIGN KEY(`user_id`) + REFERENCES users(user_id) + ON DELETE CASCADE +); + +CREATE UNIQUE INDEX IF NOT EXISTS subscribers_kind_user_id + ON subscribers (`kind`, `user_id`); diff --git a/src/main.rs b/src/main.rs index 3159e1b..d074f66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -#![allow(unused)] - mod climate; mod repo; mod utils; @@ -8,15 +6,8 @@ use anyhow::Result; use climate::SelfTemperature; use envconfig::Envconfig; use log::{debug, info, warn}; -use teloxide::{ - dispatching::{update_listeners::AsUpdateStream, UpdateFilterExt}, - dptree::di::Injectable, - filter_command, - payloads::SendMessage, - prelude::*, - utils::command::BotCommands, -}; -use tokio_stream::StreamExt; +use repo::UserDB; +use teloxide::{dispatching::UpdateFilterExt, prelude::*, utils::command::BotCommands}; use crate::repo::Storage; @@ -24,17 +15,20 @@ const VERSION: &str = env!("GIT_REVISION"); const BRANCH: &str = env!("GIT_BRANCH"); #[tokio::main] -async fn main() { +async fn main() -> Result<()> { env_logger::init(); debug!("starting the application"); - println!("starting the application"); - tokio::spawn(run()).await.unwrap(); + tokio::spawn(run()).await??; + + Ok(()) } #[derive(Envconfig, Clone, Debug)] struct Settings { + /// Token is used to authenticate itself as a bot and being able + /// to handle incoming commands and messages. #[envconfig(from = "ALTEREGO_TELEGRAM_TOKEN")] pub telegram_token: String, @@ -82,32 +76,7 @@ async fn run() -> anyhow::Result<()> { let handler = Update::filter_message() .filter_command::() .chain(dptree::filter(|msg: Message| msg.chat.is_private())) - .chain(dptree::filter_map_async( - |msg: Message, storage: repo::SqliteRepo| async move { - let chat_id = msg.chat.id.0; - - info!("checking if the user {chat_id} exists"); - let user = storage.load_user_by_chat_id(chat_id).await.unwrap(); - match user { - Some(user) => Some(user), - None => { - let fname = msg.chat.first_name().unwrap_or_default(); - let lname = msg.chat.last_name().unwrap_or_default(); - - info!("performing upsert for user {lname} {fname}"); - - let user_db = storage - .create_user(chat_id, [fname, lname].join(" ")) - .await - .unwrap(); - - info!("upserted user {user_db:?}"); - - Some(user_db) - } - } - }, - )) + .chain(dptree::filter_map_async(find_user_mw)) .branch(dptree::case![Command::RoomTemperature].endpoint(handle_temperature_sensor)) .branch(dptree::case![Command::HostTemperature].endpoint(handle_host_temperature)) .branch(dptree::case![Command::VersionRequest].endpoint(handle_version)) @@ -134,7 +103,30 @@ async fn run() -> anyhow::Result<()> { Ok(()) } -type HandlerResult = std::result::Result>; +async fn find_user_mw(msg: Message, storage: repo::SqliteRepo) -> Option { + let chat_id = msg.chat.id.0; + + info!("checking if the user with chat_id={chat_id} exists"); + let user = storage.load_user_by_chat_id(chat_id).await.unwrap(); + match user { + Some(user) => Some(user), + None => { + let fname = msg.chat.first_name().unwrap_or_default(); + let lname = msg.chat.last_name().unwrap_or_default(); + + info!("performing upsert for user {lname} {fname}"); + + let user_db = storage + .create_user(chat_id, [fname, lname].join(" ")) + .await + .unwrap(); + + info!("upserted user {user_db:?}"); + + Some(user_db) + } + } +} fn error_msg(reqid: &utils::RequestID) -> String { format!("There was an error handling command, sorry. Reffer to {reqid}") @@ -145,7 +137,6 @@ async fn handle_temperature_sensor( msg: Message, climate: climate::Client, next_req_id: utils::Generators, - storage: repo::SqliteRepo, user: repo::UserDB, ) -> Result<()> { let chat_id = msg.chat.id; @@ -220,39 +211,6 @@ async fn handle_help(bot: AutoSend, msg: Message) -> Result<()> { Ok(()) } -struct Handler { - storage: S, - climate: climate::Client, - self_temp: climate::SelfTemperature, - started: std::time::Instant, -} - -fn log_error<'a, E: std::fmt::Display>(req_id: &'a str, msg: &'a str) -> impl FnOnce(E) -> E + 'a { - move |err: E| -> E { - warn!( - "request_id={}, {} err={}", - req_id.to_owned(), - msg.to_owned(), - err - ); - err - } -} - -fn log_message(req_id: &str, msg: Message) { - info!( - "message sent to chat_id={}, text={}", - msg.chat.id, - msg.text().unwrap_or_default(), - ) -} - -#[derive(serde::Deserialize, Debug)] -struct Climate { - humidity: f32, - temp: f32, -} - #[derive(BotCommands, Debug, Clone, PartialEq, Eq)] #[command(rename = "lowercase", description = "These commands are supported:")] enum Command { diff --git a/src/repo.rs b/src/repo.rs index 8b5d390..7888432 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -3,12 +3,9 @@ use std::str::FromStr; use anyhow::Result; use log::debug; use sqlx::{ - sqlite::{ - SqliteConnectOptions, SqliteError, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous, - }, - Executor, Pool, Sqlite, + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous}, + FromRow, Pool, Sqlite, }; -use teloxide::types::User; #[derive(Debug)] pub struct SqliteConfig { @@ -21,10 +18,10 @@ pub struct SqliteConfig { impl Default for SqliteConfig { fn default() -> Self { Self { - source: ":memory:".to_string(), + source: "sqlite::memory:".to_string(), timeout: std::time::Duration::from_secs(10), - max_conns: 2, - migrate: false, + max_conns: 1, + migrate: true, } } } @@ -36,8 +33,8 @@ pub struct SqliteRepo { impl SqliteRepo { pub async fn from_config(config: SqliteConfig) -> Result { - let dsn = if !config.source.starts_with("sqlite://") { - format!("sqlite://{}", &config.source) + let dsn = if !config.source.starts_with("sqlite:") { + format!("sqlite:{}", &config.source) } else { config.source }; @@ -56,9 +53,7 @@ impl SqliteRepo { .connect_with(opts) .await?; - if config.migrate { - sqlx::migrate!("./db").run(&pool).await?; - } + sqlx::migrate!("./db").run(&pool).await?; sqlx::query("pragma temp_store = memory;") .execute(&pool) @@ -76,7 +71,7 @@ impl SqliteRepo { } } -#[derive(sqlx::FromRow, Debug, Clone)] +#[derive(FromRow, Debug, Clone, PartialEq)] pub struct UserDB { pub user_id: i64, pub chat_id: i64, @@ -84,7 +79,7 @@ pub struct UserDB { pub created_at: chrono::NaiveDateTime, } -#[derive(sqlx::FromRow)] +#[derive(FromRow)] pub struct ParameterDB { pub param_id: i64, pub user_id: i64, @@ -107,42 +102,178 @@ pub trait Storage { async fn load_user_by_chat_id(&self, chat_id: i64) -> Result>; async fn get_user_parameters(&self, user_id: i64) -> Result; - async fn upsert_parameter( - &self, - user_id: i64, - key: String, - value: String, - ) -> Result; - async fn delete_parameter(&self, param_id: i64) -> Result<()>; + async fn upsert_parameter(&self, user_id: i64, key: &str, value: &str) -> Result; - async fn insert_action(&self, user_id: i64, name: String) -> Result<()>; + async fn insert_action(&self, user_id: i64, name: &str) -> Result<()>; +} + +type SQLResult = sqlx::Result; + +async fn create_user(chat_id: i64, name: String, pool: &Pool) -> SQLResult { + sqlx::query_as( + "INSERT INTO `users` (`chat_id`, `name`, `created_at`) + VALUES (?, ?, datetime('now')) + RETURNING `user_id`, `chat_id`, `name`, `created_at`;", + ) + .bind(chat_id) + .bind(name) + .fetch_one(pool) + .await +} + +struct FindUserParams { + pub(crate) user_id: Option, + pub(crate) chat_id: Option, +} + +impl FindUserParams { + pub fn new() -> Self { + Self { + user_id: None, + chat_id: None, + } + } + + pub fn with_user_id(mut self, user_id: i64) -> Self { + self.user_id = Some(user_id); + self + } + + pub fn with_chat_id(mut self, chat_id: i64) -> Self { + self.chat_id = Some(chat_id); + self + } +} + +async fn find_user(params: FindUserParams, executor: &Pool) -> sqlx::Result { + let mut qb = sqlx::QueryBuilder::new( + "SELECT `user_id`, `chat_id`, `name`, `created_at` FROM `users` WHERE 1=1", + ); + if let Some(user_id) = params.user_id { + qb.push(" AND `user_id` = "); + qb.push_bind(user_id); + }; + + if let Some(chat_id) = params.chat_id { + qb.push(" AND `chat_id` = "); + qb.push_bind(chat_id); + } + + let row = qb.build().fetch_one(executor).await?; + UserDB::from_row(&row) +} + +async fn get_parameters_by_user(user_id: i64, pool: &Pool) -> SQLResult { + let mut mp: MappedParameter = std::collections::HashMap::new(); + sqlx::query_as( + "SELECT `param_id`, `user_id`, `key`, `value` FROM parameters WHERE `user_id` = ?", + ) + .bind(user_id) + .fetch_all(pool) + .await? + .into_iter() + .for_each(|result: ParameterDB| { + let param = ParameterBase { + param_id: result.param_id, + user_id: result.user_id, + value: result.value, + }; + + mp.insert(result.key, param); + }); + + Ok(mp) +} + +async fn upsert_parameter_for_user( + user_id: i64, + key: &str, + value: &str, + pool: &Pool, +) -> SQLResult { + sqlx::query_as( + "INSERT INTO parameters (`user_id`, `key`, `value`) + VALUES (?, ?, ?) + RETURNING `param_id`, `user_id`, `key`, `value`;", + ) + .bind(user_id) + .bind(key) + .bind(value) + .fetch_one(pool) + .await +} + +async fn insert_user_action(user_id: i64, name: &str, pool: &Pool) -> Result<()> { + sqlx::query("INSERT INTO actions (`user_id`, `name`) VALUES (?, ?)") + .bind(user_id) + .bind(name) + .execute(pool) + .await?; + + Ok(()) +} + +async fn subscriber_user( + user_id: i64, + kind: &str, + args: Option<&S>, + executor: &Pool, +) -> Result<()> +where + S: serde::Serialize, +{ + let args_out = match args { + Some(args) => Some(serde_json::to_string(args)?), + None => None, + }; + + sqlx::query("INSERT INTO subscribers (`user_id`, `kind`, `args`) VALUES (?, ?, ?)") + .bind(user_id) + .bind(kind) + .bind(args_out) + .execute(executor) + .await?; + + Ok(()) +} + +async fn unsubscribe_user(user_id: i64, kind: &str, executor: &Pool) -> Result<()> { + sqlx::query("DELETE FROM subscribers WHERE `user_id` = ? AND `kind` = ?") + .bind(user_id) + .bind(kind) + .execute(executor) + .await?; + + Ok(()) +} + +#[derive(FromRow, Debug)] +pub struct SubscriptionDB { + pub subscribe_id: i64, + pub user_id: i64, + pub kind: String, + pub args: String, +} + +async fn find_subscribers_by_kind( + kind: &str, + executor: &Pool, +) -> Result> { + Ok(sqlx::query_as( "SELECT `subscriber_id`, `user_id`, `kind`, `arguments` FROM `subcribers` WHERE `kind` = ?") + .bind(kind) + .fetch_all(executor) + .await?) } #[async_trait::async_trait] impl Storage for SqliteRepo { async fn create_user(&self, chat_id: i64, name: String) -> Result { - Ok(sqlx::query_as!( - UserDB, - "INSERT INTO users (chat_id, name, created_at)" - + " VALUES (?, ?, datetime('now'))" - + " RETURNING user_id, chat_id, name, created_at;", - chat_id, - name, - ) - .fetch_one(&self.pool) - .await?) + Ok(create_user(chat_id, name, &self.pool).await?) } async fn load_user_by_chat_id(&self, chat_id: i64) -> Result> { - let result: std::result::Result = sqlx::query_as!( - UserDB, - "SELECT user_id, chat_id, name, created_at" + " FROM users WHERE `chat_id` = ?;", - chat_id, - ) - .fetch_one(&self.pool) - .await; - - match result { + let params = FindUserParams::new().with_chat_id(chat_id); + match find_user(params, &self.pool).await { Ok(row) => Ok(Some(row)), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), @@ -152,15 +283,8 @@ impl Storage for SqliteRepo { } async fn get_user(&self, user_id: i64) -> Result> { - let result: std::result::Result = sqlx::query_as!( - UserDB, - "SELECT user_id, chat_id, name, created_at" + " FROM users WHERE `user_id` = ?;", - user_id, - ) - .fetch_one(&self.pool) - .await; - - match result { + let params = FindUserParams::new().with_user_id(user_id); + match find_user(params, &self.pool).await { Ok(row) => Ok(Some(row)), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), @@ -170,63 +294,93 @@ impl Storage for SqliteRepo { } async fn get_user_parameters(&self, user_id: i64) -> Result { - let mut mp: MappedParameter = std::collections::HashMap::new(); - sqlx::query_as!( - ParameterDB, - "SELECT `param_id`, `user_id`, `key`, `value`" + " FROM parameters WHERE `user_id` = ?", - user_id, - ) - .fetch_all(&self.pool) - .await? - .into_iter() - .for_each(|result| { - let param = ParameterBase { - param_id: result.param_id, - user_id: result.user_id, - value: result.value, - }; - - mp.insert(result.key, param); - }); - - Ok(mp) + Ok(get_parameters_by_user(user_id, &self.pool).await?) } - async fn upsert_parameter( - &self, - user_id: i64, - key: String, - value: String, - ) -> Result { - Ok(sqlx::query_as!( - ParameterDB, - "INSERT INTO parameters (`user_id`, `key`, `value`)" - + " VALUES (?, ?, ?)" - + " RETURNING `param_id`, `user_id`, `key`, `value`;", - user_id, - key, - value, - ) - .fetch_one(&self.pool) - .await?) + async fn upsert_parameter(&self, user_id: i64, key: &str, value: &str) -> Result { + Ok(upsert_parameter_for_user(user_id, key, value, &self.pool).await?) } - async fn delete_parameter(&self, param_id: i64) -> Result<()> { - sqlx::query("DELETE FROM parameters WHERE `param_id` = ?;") - .bind(param_id) - .execute(&self.pool) - .await?; - - Ok(()) - } - - async fn insert_action(&self, user_id: i64, name: String) -> Result<()> { - sqlx::query("INSERT INTO actions (`user_id`, `name`) VALUES (?, ?)") - .bind(user_id) - .bind(name) - .execute(&self.pool) - .await?; - - Ok(()) + async fn insert_action(&self, user_id: i64, name: &str) -> Result<()> { + insert_user_action(user_id, name, &self.pool).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Once; + + static ONCE: Once = Once::new(); + + async fn prepare() -> Result> { + const DSN: &str = "sqlite::memory:"; + + ONCE.call_once(|| { + std::env::set_var("RUST_LOG", "debug"); + env_logger::init(); + }); + + let pool = SqlitePoolOptions::new().connect(DSN).await?; + sqlx::migrate!("./db").run(&pool).await?; + + sqlx::query( + "INSERT INTO `users` (user_id, chat_id, name, created_at) + VALUES (1, 100, 'Alex', datetime('now'));", + ) + .execute(&pool) + .await?; + + Ok(pool) + } + + #[tokio::test] + pub async fn test_get_user_by_chat_id() { + let executor = prepare().await.expect("should prepare store"); + let params = FindUserParams::new().with_chat_id(100); + let user = find_user(params, &executor) + .await + .expect("should found user"); + + let exp_user = UserDB { + user_id: 1, + chat_id: 100, + name: "Alex".to_string(), + created_at: user.created_at, + }; + assert_eq!(exp_user, user); + } + + #[tokio::test] + pub async fn test_get_user_by_user_id() { + let executor = prepare().await.expect("should prepare store"); + let params = FindUserParams::new().with_user_id(1); + let user = find_user(params, &executor) + .await + .expect("should found user"); + + let exp_user = UserDB { + user_id: 1, + chat_id: 100, + name: "Alex".to_string(), + created_at: user.created_at, + }; + assert_eq!(exp_user, user); + } + + #[tokio::test] + pub async fn test_create_user() { + let pool = prepare().await.expect("should prepare store"); + let user = create_user(101, "Phew".to_owned(), &pool) + .await + .expect("should create user"); + + let exp_user = UserDB { + user_id: user.user_id, + chat_id: 101, + name: "Phew".to_string(), + created_at: user.created_at, + }; + assert_eq!(exp_user, user); } }