diff --git a/.env b/.env index 758970d..cb87fe5 100644 --- a/.env +++ b/.env @@ -1,3 +1,6 @@ -ALTEREGO_TELEGRAM_TOKEN= -ALTEREGO_CLIMATE_DSN= -ALTEREGO_HOSTTEMP_CMD= +ALTEREGO_TELEGRAM_TOKEN=170515067:AAElDJ8Sq_oIqo9WaL4DKvUr13nSEIdHCYs +ALTEREGO_CLIMATE_DSN=http://192.168.1.159/ +ALTEREGO_HOSTTEMP_CMD="/opt/vc/bin/vcgencmd measure_temp" +ALTEREGO_DATABASE_URL=sqlite://.testdata/db.sqlite +DATABASE_URL=sqlite://.testdata/db.sqlite +RUST_LOG=debug diff --git a/.gitignore b/.gitignore index 96ef6c0..665bfd0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target Cargo.lock +.testdata +.vscode diff --git a/Cargo.toml b/Cargo.toml index 8d137c7..b66e10d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,20 +1,35 @@ [package] name = "altherego" -version = "0.9.5" -authors = ["Aleksandr Trushkin "] +version = "0.9.9" +authors = ["Aleksandr Trushkin "] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -teloxide = "0.3" -teloxide-macros = "0.3" -tokio = {version = "0.2.23", features = ["full"]} -reqwest = "0.10.8" -serde = "1.0.117" -serde_json = "1.0.59" +teloxide = { version = "0.9.0", features = ["macros", "auto-send"] } +tokio = {version = "1.8", features = ["full"]} uuid = { version = "0.8.1", features = ["v4"] } log = "0.4" env_logger = "0.8.1" envconfig = "0.9.1" -openssl = {version="*", features = ["vendored"]} +serde = "1.0.137" +reqwest = { version = "0.11.10", features = ["tokio-native-tls"] } +serde_json = "1.0.81" +sqlx = { version = "0.5.13", features = ["sqlite", "runtime-tokio-native-tls", "chrono", "migrate"] } +chrono = "0.4.19" +anyhow = "1.0.57" +async-trait = "0.1.53" +tokio-stream = "0.1.8" +rand = "0.8.5" + +[[bin]] +name = "altherego" +path = "src/main.rs" + +[[bin]] +name = "migrator" +path = "src/migrator/main.rs" + +[profile.dev.package.sqlx-macros] +opt-level = 3 diff --git a/build.rs b/build.rs index 5a33b86..7455a7a 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ use std::env; -fn main() { +fn main() -> Result<(), Box> { let rev = get_value_from_env("GIT_VERSION") .or_else(|| get_value_from_command("git", ["rev-parse", "--short", "HEAD"])) .unwrap_or_else(|| "unknown".to_owned()); @@ -12,6 +12,25 @@ fn main() { println!("cargo:rustc-env=GIT_REVISION={}", rev); println!("cargo:rustc-env=GIT_BRANCH={}", branch); println!("cargo:rerun-if-env-changed=GIT_REVISION"); + + if let Ok(data) = std::fs::read_to_string(".env") { + data.split('\n').into_iter().for_each(|v| { + let kv: Vec<&str> = v.split('=').collect(); + if kv.len() != 2 { + return; + } + let (key, value) = (kv[0], kv[1]); + + if key == "DATABASE_URL" { + return; + } + + println!("cargo:rustc-env={}={}", key, value); + println!("cargo:rerun-if-env-changed={}", key); + }) + } + + Ok(()) } fn get_value_from_env(key: &str) -> Option { diff --git a/db/001_initial.sql b/db/001_initial.sql new file mode 100644 index 0000000..554addd --- /dev/null +++ b/db/001_initial.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS users ( + `user_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT, + `chat_id` integer NOT NULL UNIQUE, + `name` VARCHAR(64) NOT NULL, + created_at datetime NOT NULL +); + +CREATE TABLE IF NOT EXISTS parameters ( + `param_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT, + `user_id` integer NOT NULL, + `key` VARCHAR(64) NOT NULL, + `value` VARCHAR(64) NOT NULL, + FOREIGN KEY(user_id) + REFERENCES users(user_id) + ON DELETE CASCADE +); + +CREATE UNIQUE INDEX IF NOT EXISTS actions_action_id_user_id_idx + ON parameters (`user_id`, `key`); + +CREATE TABLE IF NOT EXISTS actions ( + `action_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT, + `user_id` integer NOT NULL, + `name` VARCHAR(64) NOT NULL, + FOREIGN KEY(user_id) + REFERENCES users(user_id) + ON DELETE CASCADE +); diff --git a/makefile b/makefile index 09f0ad7..39bcf48 100644 --- a/makefile +++ b/makefile @@ -9,7 +9,7 @@ DOCKERFLAGS:=-it --rm \ DOCKERIMG:="rust-build-env:V1" APP_NAME:=altherego -IMAGE:=rust:1.49 +IMAGE:=rust:1.60 TARGET_ARCH:=armv7-unknown-linux-gnueabihf image: diff --git a/src/climate.rs b/src/climate.rs new file mode 100644 index 0000000..d8907d1 --- /dev/null +++ b/src/climate.rs @@ -0,0 +1,63 @@ +use anyhow::{anyhow, Result}; + +#[derive(Clone)] +pub struct Client { + addr: String, + client: reqwest::Client, +} + +#[derive(serde::Deserialize, Debug)] +pub struct Climate { + pub humidity: f32, + pub temp: f32, +} + +impl Client { + pub fn new(addr: &str) -> Self { + let addr = addr.to_owned(); + let client = reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(2)) + .build() + .expect("building client"); + + Self { addr, client } + } + + pub async fn fetch(&self) -> Result { + let request = self.client.get(&self.addr).build()?; + Ok(self.client.execute(request).await?.json().await?) + } +} + +#[derive(Clone)] +pub struct SelfTemperature { + cmd: String, + arg: String, +} + +impl SelfTemperature { + pub fn new(cmd: &str, arg: &str) -> Self { + let cmd = cmd.to_owned(); + let arg = arg.to_owned(); + + Self { cmd, arg } + } + + pub async fn fetch(&self) -> Result { + let output = tokio::process::Command::new(&self.cmd) + .arg(&self.arg) + .output() + .await?; + + if !output.status.success() { + let msg = std::string::String::from_utf8(output.stderr)?; + + return Err(anyhow!("getting temp from command: {}", msg)); + } + + let output = std::string::String::from_utf8(output.stdout)?; + let out = output.replace("temp=", "").parse()?; + + Ok(out) + } +} diff --git a/src/main.rs b/src/main.rs index edd0268..a690e6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,38 @@ +#![allow(unused)] + +mod climate; +mod repo; +mod utils; + +use anyhow::Result; +use climate::SelfTemperature; use envconfig::Envconfig; use log::{debug, info, warn}; -use teloxide::{prelude::*, utils::command::BotCommand}; +use teloxide::{ + dispatching::{update_listeners::AsUpdateStream, UpdateFilterExt}, + dptree::di::Injectable, + filter_command, + payloads::SendMessage, + prelude::*, + utils::command::BotCommands, +}; +use tokio_stream::StreamExt; + +use crate::repo::Storage; const VERSION: &str = env!("GIT_REVISION"); const BRANCH: &str = env!("GIT_BRANCH"); #[tokio::main] async fn main() { + env_logger::init(); + debug!("starting the application"); tokio::spawn(run()).await.unwrap(); } -#[derive(Envconfig, Clone)] +#[derive(Envconfig, Clone, Debug)] struct Settings { #[envconfig(from = "ALTEREGO_TELEGRAM_TOKEN")] pub telegram_token: String, @@ -28,30 +48,200 @@ struct Settings { default = "/opt/vc/bin/vcgencmd measure_temp" )] pub hosttemp_cmd: String, + + #[envconfig(from = "ALTEREGO_DATABASE_URL", default = "./db.sqlite")] + pub db_source: String, } -async fn run() { - env_logger::init(); +async fn run() -> anyhow::Result<()> { + info!("starting"); let settings = Settings::init_from_env().expect("reading config values"); - let startup = std::sync::Arc::from(std::time::SystemTime::now()); + let bot = teloxide::Bot::new(&settings.telegram_token).auto_send(); + let repo_config = repo::SqliteConfig { + source: settings.db_source, + ..Default::default() + }; - let bot = teloxide::Bot::builder() - .token(&settings.telegram_token) - .build(); - let bot_name = "AlterEgo"; + info!("repo config: {repo_config:?}"); - teloxide::commands_repl(bot, bot_name, move |cx, command| { - let climate = settings.climate_dsn.clone(); - let cmd: String = settings.hosttemp_cmd.clone(); - let cmd: Vec<&str> = cmd.split(' ').collect(); - let console_cmd = cmd.first().expect("getting console command").to_string(); - let arg: String = cmd.get(1).unwrap_or(&"").to_string(); - let startup = *startup; + let sqlite_storage = repo::SqliteRepo::from_config(repo_config).await?; - async move { handler(cx, command, climate, console_cmd, arg, startup).await } - }) - .await; + let climate_client = climate::Client::new(&settings.climate_dsn); + + let self_temp_client = { + let splitted: Vec<&str> = settings.hosttemp_cmd.split(' ').into_iter().collect(); + let (cmd, arg) = (splitted[0], splitted[1]); + + climate::SelfTemperature::new(cmd, arg) + }; + + let handler = Update::filter_message() + .filter_command::() + .chain(dptree::filter(|msg: Message| msg.chat.is_private())) + .chain(dptree::filter_map_async( + |msg: Message, storage: repo::SqliteRepo| async move { + let chat_id = msg.chat.id.0; + + info!("checking if the user {chat_id} exists"); + let user = storage.load_user_by_chat_id(chat_id).await.unwrap(); + match user { + Some(user) => Some(user), + None => { + let fname = msg.chat.first_name().unwrap_or_default(); + let lname = msg.chat.last_name().unwrap_or_default(); + + info!("performing upsert for user {lname} {fname}"); + + let user_db = storage + .create_user(chat_id, [fname, lname].join(" ")) + .await + .unwrap(); + + info!("upserted user {user_db:?}"); + + Some(user_db) + } + } + }, + )) + .branch(dptree::case![Command::RoomTemperature].endpoint(handle_temperature_sensor)) + .branch(dptree::case![Command::HostTemperature].endpoint(handle_host_temperature)) + .branch(dptree::case![Command::VersionRequest].endpoint(handle_version)) + .branch(dptree::case![Command::Help].endpoint(handle_help)); + + let mut dependencies = DependencyMap::new(); + dependencies.insert(sqlite_storage); + dependencies.insert(climate_client); + dependencies.insert(self_temp_client); + dependencies.insert(utils::Generators::new()); + + info!("running"); + + Dispatcher::builder(bot, handler) + .dependencies(dependencies) + .default_handler(|upd| async move { + warn!("unhandled update: {:?}", upd); + }) + .build() + .setup_ctrlc_handler() + .dispatch() + .await; + + Ok(()) +} + +type HandlerResult = std::result::Result>; + +fn error_msg(reqid: &utils::RequestID) -> String { + format!("There was an error handling command, sorry. Reffer to {reqid}") +} + +async fn handle_temperature_sensor( + bot: AutoSend, + msg: Message, + climate: climate::Client, + next_req_id: utils::Generators, + storage: repo::SqliteRepo, + user: repo::UserDB, +) -> Result<()> { + let chat_id = msg.chat.id; + let reqid = next_req_id.next_request_id(); + + let name = user.name; + bot.send_message(chat_id, format!("Just a second, {name}, asking...")) + .await?; + + match climate.fetch().await { + Ok(temp) => { + let text = format!( + "Humidity is {} and temperature is {}", + temp.humidity, temp.temp, + ); + + bot.send_message(chat_id, text).await?; + } + Err(err) => { + warn!("[{reqid}] unable to fetch self_temp: {err}"); + let msg = error_msg(&reqid); + bot.send_message(chat_id, msg).await?; + } + }; + + Ok(()) +} + +async fn handle_host_temperature( + bot: AutoSend, + msg: Message, + temp: SelfTemperature, + next_req_id: utils::Generators, +) -> Result<()> { + let chat_id = msg.chat.id; + let reqid = next_req_id.next_request_id(); + + bot.send_message(chat_id, "Just a second, asking...") + .await?; + + match temp.fetch().await { + Ok(temp) => { + let text = format!("Host temperature is {} degrees celcius", temp,); + + bot.send_message(chat_id, text).await?; + } + Err(err) => { + warn!("[{reqid}] unable to fetch self_temp: {err}"); + let msg = error_msg(&reqid); + bot.send_message(chat_id, msg).await?; + } + } + + Ok(()) +} + +async fn handle_version(bot: AutoSend, msg: Message) -> Result<()> { + let chat_id = msg.chat.id; + let text = format!("Bot version is {} (branch: {})", VERSION, BRANCH,); + + bot.send_message(chat_id, text).await?; + + Ok(()) +} + +async fn handle_help(bot: AutoSend, msg: Message) -> Result<()> { + let chat_id = msg.chat.id; + + bot.send_message(chat_id, Command::descriptions().to_string()) + .await?; + + Ok(()) +} + +struct Handler { + storage: S, + climate: climate::Client, + self_temp: climate::SelfTemperature, + started: std::time::Instant, +} + +fn log_error<'a, E: std::fmt::Display>(req_id: &'a str, msg: &'a str) -> impl FnOnce(E) -> E + 'a { + move |err: E| -> E { + warn!( + "request_id={}, {} err={}", + req_id.to_owned(), + msg.to_owned(), + err + ); + err + } +} + +fn log_message(req_id: &str, msg: Message) { + info!( + "message sent to chat_id={}, text={}", + msg.chat.id, + msg.text().unwrap_or_default(), + ) } #[derive(serde::Deserialize, Debug)] @@ -60,99 +250,7 @@ struct Climate { temp: f32, } -async fn handler( - cx: UpdateWithCx, - command: Command, - dsn: String, - console_command: String, - console_arg: String, - startup: std::time::SystemTime, -) -> ResponseResult<()> { - let request_id = uuid::Uuid::new_v4(); - - info!( - "incoming request xreqid={} command={:?}", - request_id, command - ); - - match command { - Command::Help => cx.answer(Command::descriptions()).send().await?, - Command::HostTemperature => { - info!( - "querying command {} with arg {}", - console_command, console_arg - ); - - let cmd = std::process::Command::new(&console_command) - .arg(&console_arg) - .stdout(std::process::Stdio::piped()) - .spawn() - .expect("running vcgencmd command"); - - let output = cmd.wait_with_output().expect("waiting for output"); - - let parsed = - std::string::String::from_utf8(output.stdout).expect("casting into string"); - - let parsed = parsed.replace("temp=", ""); - - cx.answer_str(format!("Your Raspberry PI temperature is {}", parsed)) - .await? - } - Command::RoomTemperature => { - info!("sending request to {}", dsn); - - let response = match reqwest::get(&dsn).await { - Ok(response) => response, - Err(err) => { - warn!( - "unable to handle request xreqid={} error={:?}", - request_id, err - ); - cx.answer_str(format!("something went wrong, reference to {}", request_id)) - .await?; - - return Err(RequestError::NetworkError(err)); - } - }; - - let info: Climate = match response.json::().await { - Ok(result) => result, - Err(err) => { - warn!( - "unable to handle request xreqid={} error={:?}", - request_id, err - ); - cx.answer_str(format!("something went wrong, reference to {}", request_id)) - .await?; - - return Err(RequestError::NetworkError(err)); - } - }; - - debug!("parsed value: {:?}", info); - - cx.answer_str(format!( - "Your room temperature is {:.2} and humidity is {:.2}.", - info.temp, info.humidity - )) - .await? - } - Command::VersionRequest => { - cx.answer_str(format!( - "app version is {}@{}, uptime is {} second(-s)", - VERSION, - BRANCH, - startup.elapsed().unwrap().as_secs() - )) - .await? - } - }; - - Ok(()) -} - -#[derive(BotCommand, Debug)] +#[derive(BotCommands, Debug, Clone, PartialEq, Eq)] #[command(rename = "lowercase", description = "These commands are supported:")] enum Command { #[command(description = "display this text.")] diff --git a/src/migrator/main.rs b/src/migrator/main.rs new file mode 100644 index 0000000..834e6fb --- /dev/null +++ b/src/migrator/main.rs @@ -0,0 +1,42 @@ +use std::str::FromStr; + +use anyhow::Result; +use log::{debug, info}; +use envconfig::Envconfig; +use sqlx::{ + SqlitePool, + sqlite::SqliteConnectOptions, + migrate, +}; + +#[derive(Envconfig, Clone, Debug)] +struct Settings { + #[envconfig(from = "ALTEREGO_DATABASE_URL", default = "./db.sqlite")] + pub db_source: String, +} + + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + info!("starting the application"); + + tokio::spawn(run()).await? +} + +async fn run() -> Result<()> { + debug!("running migrations"); + + let settings = Settings::init_from_env().expect("reading config values"); + + let opts = SqliteConnectOptions::from_str(&settings.db_source)? + .create_if_missing(true) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); + + let pool = SqlitePool::connect_with(opts).await?; + + migrate!("./db/").run(&pool).await?; + + Ok(()) +} diff --git a/src/repo.rs b/src/repo.rs new file mode 100644 index 0000000..95f63b7 --- /dev/null +++ b/src/repo.rs @@ -0,0 +1,226 @@ +use std::str::FromStr; + +use anyhow::Result; +use log::debug; +use sqlx::{ + sqlite::{ + SqliteConnectOptions, SqliteError, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous, + }, + Executor, Pool, Sqlite, +}; +use teloxide::types::User; + +#[derive(Debug)] +pub struct SqliteConfig { + pub source: String, + pub timeout: std::time::Duration, + pub max_conns: u32, +} + +impl Default for SqliteConfig { + fn default() -> Self { + Self { + source: ":memory:".to_string(), + timeout: std::time::Duration::from_secs(10), + max_conns: 2, + } + } +} + +#[derive(Clone)] +pub struct SqliteRepo { + pool: Pool, +} + +impl SqliteRepo { + pub async fn from_config(config: SqliteConfig) -> Result { + let dsn = if !config.source.starts_with("sqlite://") { + format!("sqlite://{}", &config.source) + } else { + config.source + }; + + debug!("connecting to {}", dsn); + + let opts = SqliteConnectOptions::from_str(&dsn)? + .create_if_missing(true) + .journal_mode(SqliteJournalMode::Wal) + .synchronous(SqliteSynchronous::Normal) + .busy_timeout(config.timeout); + + let pool = SqlitePoolOptions::new() + .max_connections(config.max_conns) + .connect_timeout(config.timeout) + .connect_with(opts) + .await?; + + sqlx::query("pragma temp_store = memory;") + .execute(&pool) + .await?; + + sqlx::query("pragma mmap_size = 30000000000;") + .execute(&pool) + .await?; + + sqlx::query("pragma page_size = 4096;") + .execute(&pool) + .await?; + + Ok(Self { pool }) + } +} + +#[derive(sqlx::FromRow, Debug, Clone)] +pub struct UserDB { + pub user_id: i64, + pub chat_id: i64, + pub name: String, + pub created_at: chrono::NaiveDateTime, +} + +#[derive(sqlx::FromRow)] +pub struct ParameterDB { + pub param_id: i64, + pub user_id: i64, + pub key: String, + pub value: String, +} + +pub type MappedParameter = std::collections::HashMap; + +pub struct ParameterBase { + pub param_id: i64, + pub user_id: i64, + pub value: String, +} + +#[async_trait::async_trait] +pub trait Storage { + async fn create_user(&self, chat_id: i64, name: String) -> Result; + async fn get_user(&self, user_id: i64) -> Result>; + async fn load_user_by_chat_id(&self, chat_id: i64) -> Result>; + async fn get_user_parameters(&self, user_id: i64) -> Result; + + async fn upsert_parameter( + &self, + user_id: i64, + key: String, + value: String, + ) -> Result; + async fn delete_parameter(&self, param_id: i64) -> Result<()>; + + async fn insert_action(&self, user_id: i64, name: String) -> Result<()>; +} + +#[async_trait::async_trait] +impl Storage for SqliteRepo { + async fn create_user(&self, chat_id: i64, name: String) -> Result { + Ok(sqlx::query_as!( + UserDB, + "INSERT INTO users (chat_id, name, created_at)" + + " VALUES (?, ?, datetime('now'))" + + " RETURNING user_id, chat_id, name, created_at;", + chat_id, + name, + ) + .fetch_one(&self.pool) + .await?) + } + + async fn load_user_by_chat_id(&self, chat_id: i64) -> Result> { + let result: std::result::Result = sqlx::query_as!( + UserDB, + "SELECT user_id, chat_id, name, created_at" + " FROM users WHERE `chat_id` = ?;", + chat_id, + ) + .fetch_one(&self.pool) + .await; + + match result { + Ok(row) => Ok(Some(row)), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + err => Err(anyhow::anyhow!(err)), + }, + } + } + + async fn get_user(&self, user_id: i64) -> Result> { + let result: std::result::Result = sqlx::query_as!( + UserDB, + "SELECT user_id, chat_id, name, created_at" + " FROM users WHERE `user_id` = ?;", + user_id, + ) + .fetch_one(&self.pool) + .await; + + match result { + Ok(row) => Ok(Some(row)), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + err => Err(anyhow::anyhow!(err)), + }, + } + } + + async fn get_user_parameters(&self, user_id: i64) -> Result { + let mut mp: MappedParameter = std::collections::HashMap::new(); + sqlx::query_as!( + ParameterDB, + "SELECT `param_id`, `user_id`, `key`, `value`" + " FROM parameters WHERE `user_id` = ?", + user_id, + ) + .fetch_all(&self.pool) + .await? + .into_iter() + .for_each(|result| { + let param = ParameterBase { + param_id: result.param_id, + user_id: result.user_id, + value: result.value, + }; + + mp.insert(result.key, param); + }); + + Ok(mp) + } + + async fn upsert_parameter( + &self, + user_id: i64, + key: String, + value: String, + ) -> Result { + Ok(sqlx::query_as!( + ParameterDB, + "INSERT INTO parameters (`user_id`, `key`, `value`)" + + " VALUES (?, ?, ?)" + + " RETURNING `param_id`, `user_id`, `key`, `value`;", + user_id, + key, + value, + ) + .fetch_one(&self.pool) + .await?) + } + + async fn delete_parameter(&self, param_id: i64) -> Result<()> { + sqlx::query("DELETE FROM parameters WHERE `param_id` = ?;") + .bind(param_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn insert_action(&self, user_id: i64, name: String) -> Result<()> { + sqlx::query("INSERT INTO actions (`user_id`, `name`) VALUES (?, ?)") + .bind(user_id) + .bind(name) + .execute(&self.pool) + .await?; + + Ok(()) + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..d5931d3 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,25 @@ +pub struct RequestID(String); + +impl std::fmt::Display for RequestID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +#[derive(Clone)] +pub struct Generators; + +impl Generators { + pub fn new() -> Self { + Self + } + + pub fn next_request_id(&self) -> RequestID { + let data: u32 = rand::random(); + let data_hex = format!("{:x}", data); + + log::info!("issued new request: {}", data_hex); + + RequestID(data_hex) + } +}