Compare commits
3 Commits
master
...
feature/st
| Author | SHA1 | Date | |
|---|---|---|---|
| 41072b91d1 | |||
| fb03a110ed | |||
| 473953fc9c |
9
.env
9
.env
@ -1,3 +1,6 @@
|
||||
ALTEREGO_TELEGRAM_TOKEN=
|
||||
ALTEREGO_CLIMATE_DSN=
|
||||
ALTEREGO_HOSTTEMP_CMD=
|
||||
ALTEREGO_TELEGRAM_TOKEN=170515067:AAElDJ8Sq_oIqo9WaL4DKvUr13nSEIdHCYs
|
||||
ALTEREGO_CLIMATE_DSN=http://192.168.1.159/
|
||||
ALTEREGO_HOSTTEMP_CMD="/opt/vc/bin/vcgencmd measure_temp"
|
||||
ALTEREGO_DATABASE_URL=sqlite://.testdata/db.sqlite
|
||||
DATABASE_URL=sqlite://.testdata/db.sqlite
|
||||
RUST_LOG=debug
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@ -1 +1,3 @@
|
||||
/target
|
||||
.testdata
|
||||
.vscode
|
||||
|
||||
1265
Cargo.lock
generated
1265
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
23
Cargo.toml
23
Cargo.toml
@ -1,20 +1,27 @@
|
||||
[package]
|
||||
name = "altherego"
|
||||
version = "0.9.5"
|
||||
version = "0.9.9"
|
||||
authors = ["Aleksandr Trushkin <aleksandr.trushkin@rt.ru>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
teloxide = "0.3"
|
||||
teloxide-macros = "0.3"
|
||||
tokio = {version = "0.2.23", features = ["full"]}
|
||||
reqwest = "0.10.8"
|
||||
serde = "1.0.117"
|
||||
serde_json = "1.0.59"
|
||||
teloxide = { version = "0.9.0", features = ["macros", "auto-send"] }
|
||||
tokio = {version = "1.8", features = ["full"]}
|
||||
uuid = { version = "0.8.1", features = ["v4"] }
|
||||
log = "0.4"
|
||||
env_logger = "0.8.1"
|
||||
envconfig = "0.9.1"
|
||||
openssl = {version="*", features = ["vendored"]}
|
||||
serde = "1.0.137"
|
||||
reqwest = { version = "0.11.10", features = ["tokio-native-tls"] }
|
||||
serde_json = "1.0.81"
|
||||
sqlx = { version = "0.5.13", features = ["sqlite", "runtime-tokio-native-tls", "chrono", "migrate"] }
|
||||
chrono = "0.4.19"
|
||||
anyhow = "1.0.57"
|
||||
async-trait = "0.1.53"
|
||||
tokio-stream = "0.1.8"
|
||||
rand = "0.8.5"
|
||||
|
||||
[profile.dev.package.sqlx-macros]
|
||||
opt-level = 3
|
||||
|
||||
19
build.rs
19
build.rs
@ -12,10 +12,27 @@ fn main() {
|
||||
println!("cargo:rustc-env=GIT_REVISION={}", rev);
|
||||
println!("cargo:rustc-env=GIT_BRANCH={}", branch);
|
||||
println!("cargo:rerun-if-env-changed=GIT_REVISION");
|
||||
|
||||
if let Ok(data) = std::fs::read_to_string(".env") {
|
||||
data.split('\n').into_iter().for_each(|v| {
|
||||
if v.starts_with("DATABASE") {
|
||||
return;
|
||||
}
|
||||
|
||||
let kv: Vec<&str> = v.split('=').collect();
|
||||
if kv.len() != 2 {
|
||||
return;
|
||||
}
|
||||
|
||||
let (key, value) = (kv[0], kv[1]);
|
||||
println!("cargo:rustc-env={}={}", key, value);
|
||||
println!("cargo:rerun-if-env-changed={}", key);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value_from_env(key: &str) -> Option<String> {
|
||||
env::var(key).map_or_else(|_| None, |v| Some(v))
|
||||
env::var(key).map_or_else(|_| None, Some)
|
||||
}
|
||||
|
||||
fn get_value_from_command<I: IntoIterator<Item = S>, S: AsRef<std::ffi::OsStr>>(
|
||||
|
||||
33
db/001_initial.sql
Normal file
33
db/001_initial.sql
Normal file
@ -0,0 +1,33 @@
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
`user_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
`chat_id` integer NOT NULL UNIQUE,
|
||||
`name` VARCHAR(64) NOT NULL,
|
||||
created_at datetime NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS parameters (
|
||||
`param_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
`user_id` integer NOT NULL,
|
||||
`key` VARCHAR(64) NOT NULL,
|
||||
`value` VARCHAR(64) NOT NULL,
|
||||
FOREIGN KEY(user_id)
|
||||
REFERENCES users(user_id)
|
||||
ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS actions_action_id_user_id_idx
|
||||
ON parameters (`user_id`, `key`);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS actions (
|
||||
`action_id` integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
`user_id` integer NOT NULL,
|
||||
`name` VARCHAR(64) NOT NULL,
|
||||
FOREIGN KEY(user_id)
|
||||
REFERENCES users(user_id)
|
||||
ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- drop index actions_action_id_user_id_idx;
|
||||
-- drop table users;
|
||||
-- drop table parameters;
|
||||
-- drop table actions;
|
||||
12
db/002_subscribers.sql
Normal file
12
db/002_subscribers.sql
Normal file
@ -0,0 +1,12 @@
|
||||
CREATE TABLE IF NOT EXISTS `subscribers` (
|
||||
`subscriber_id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
`user_id` INTEGER NOT NULL,
|
||||
`kind` VARCHAR(16) NOT NULL,
|
||||
`arguments` JSON NOT NULL DEFAULT '{}',
|
||||
FOREIGN KEY(`user_id`)
|
||||
REFERENCES users(user_id)
|
||||
ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS subscribers_kind_user_id
|
||||
ON subscribers (`kind`, `user_id`);
|
||||
2
makefile
2
makefile
@ -8,7 +8,7 @@ DOCKERFLAGS:=-it --rm \
|
||||
DOCKERIMG:="rust-build-env:V1"
|
||||
|
||||
APP_NAME:=altherego
|
||||
IMAGE:=rust:1.49
|
||||
IMAGE:=rust:1.60
|
||||
TARGET_ARCH:=armv7-unknown-linux-gnueabihf
|
||||
|
||||
image:
|
||||
|
||||
63
src/climate.rs
Normal file
63
src/climate.rs
Normal file
@ -0,0 +1,63 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Client {
|
||||
addr: String,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
pub struct Climate {
|
||||
pub humidity: f32,
|
||||
pub temp: f32,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(addr: &str) -> Self {
|
||||
let addr = addr.to_owned();
|
||||
let client = reqwest::Client::builder()
|
||||
.connect_timeout(std::time::Duration::from_secs(2))
|
||||
.build()
|
||||
.expect("building client");
|
||||
|
||||
Self { addr, client }
|
||||
}
|
||||
|
||||
pub async fn fetch(&self) -> Result<Climate> {
|
||||
let request = self.client.get(&self.addr).build()?;
|
||||
Ok(self.client.execute(request).await?.json().await?)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SelfTemperature {
|
||||
cmd: String,
|
||||
arg: String,
|
||||
}
|
||||
|
||||
impl SelfTemperature {
|
||||
pub fn new(cmd: &str, arg: &str) -> Self {
|
||||
let cmd = cmd.to_owned();
|
||||
let arg = arg.to_owned();
|
||||
|
||||
Self { cmd, arg }
|
||||
}
|
||||
|
||||
pub async fn fetch(&self) -> Result<i64> {
|
||||
let output = tokio::process::Command::new(&self.cmd)
|
||||
.arg(&self.arg)
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
let msg = std::string::String::from_utf8(output.stderr)?;
|
||||
|
||||
return Err(anyhow!("getting temp from command: {}", msg));
|
||||
}
|
||||
|
||||
let output = std::string::String::from_utf8(output.stdout)?;
|
||||
let out = output.replace("temp=", "").parse()?;
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
279
src/main.rs
279
src/main.rs
@ -1,20 +1,34 @@
|
||||
use env_logger;
|
||||
mod climate;
|
||||
mod repo;
|
||||
mod utils;
|
||||
|
||||
use anyhow::Result;
|
||||
use climate::SelfTemperature;
|
||||
use envconfig::Envconfig;
|
||||
use log::{debug, info, warn};
|
||||
use teloxide::{prelude::*, utils::command::BotCommand};
|
||||
use repo::UserDB;
|
||||
use teloxide::{dispatching::UpdateFilterExt, prelude::*, utils::command::BotCommands};
|
||||
|
||||
const VERSION: &'static str = env!("GIT_REVISION");
|
||||
const BRANCH: &'static str = env!("GIT_BRANCH");
|
||||
use crate::repo::Storage;
|
||||
|
||||
const VERSION: &str = env!("GIT_REVISION");
|
||||
const BRANCH: &str = env!("GIT_BRANCH");
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
debug!("starting the application");
|
||||
|
||||
tokio::spawn(run()).await.unwrap();
|
||||
tokio::spawn(run()).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Envconfig, Clone)]
|
||||
#[derive(Envconfig, Clone, Debug)]
|
||||
struct Settings {
|
||||
/// Token is used to authenticate itself as a bot and being able
|
||||
/// to handle incoming commands and messages.
|
||||
#[envconfig(from = "ALTEREGO_TELEGRAM_TOKEN")]
|
||||
pub telegram_token: String,
|
||||
|
||||
@ -29,132 +43,175 @@ struct Settings {
|
||||
default = "/opt/vc/bin/vcgencmd measure_temp"
|
||||
)]
|
||||
pub hosttemp_cmd: String,
|
||||
|
||||
#[envconfig(from = "ALTEREGO_DATABASE_URL", default = "./db.sqlite")]
|
||||
pub db_source: String,
|
||||
}
|
||||
|
||||
async fn run() {
|
||||
env_logger::init();
|
||||
async fn run() -> anyhow::Result<()> {
|
||||
info!("starting");
|
||||
|
||||
let settings = Settings::init_from_env().expect("reading config values");
|
||||
let startup = std::sync::Arc::from(std::time::SystemTime::now());
|
||||
let bot = teloxide::Bot::new(&settings.telegram_token).auto_send();
|
||||
let migrate = std::env::args().any(|v| v == "migrate");
|
||||
let repo_config = repo::SqliteConfig {
|
||||
source: settings.db_source,
|
||||
migrate,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let bot = teloxide::Bot::builder()
|
||||
.token(&settings.telegram_token)
|
||||
.build();
|
||||
let bot_name = "AlterEgo";
|
||||
info!("repo config: {repo_config:?}");
|
||||
|
||||
teloxide::commands_repl(bot, bot_name, move |cx, command| {
|
||||
let climate = settings.climate_dsn.clone();
|
||||
let cmd: String = settings.hosttemp_cmd.clone();
|
||||
let cmd: Vec<&str> = cmd.split(" ").collect();
|
||||
let console_cmd = cmd.get(0).expect("getting console command").to_string();
|
||||
let arg: String = cmd.get(1).unwrap_or_else(|| &"").to_string();
|
||||
// let startup = std::sync::Arc::from(*startup);
|
||||
let startup = *startup;
|
||||
let sqlite_storage = repo::SqliteRepo::from_config(repo_config).await?;
|
||||
|
||||
async move { handler(cx, command, climate, console_cmd, arg, startup).await }
|
||||
})
|
||||
.await;
|
||||
let climate_client = climate::Client::new(&settings.climate_dsn);
|
||||
|
||||
let self_temp_client = {
|
||||
let splitted: Vec<&str> = settings.hosttemp_cmd.split(' ').into_iter().collect();
|
||||
let (cmd, arg) = (splitted[0], splitted[1]);
|
||||
|
||||
climate::SelfTemperature::new(cmd, arg)
|
||||
};
|
||||
|
||||
let handler = Update::filter_message()
|
||||
.filter_command::<Command>()
|
||||
.chain(dptree::filter(|msg: Message| msg.chat.is_private()))
|
||||
.chain(dptree::filter_map_async(find_user_mw))
|
||||
.branch(dptree::case![Command::RoomTemperature].endpoint(handle_temperature_sensor))
|
||||
.branch(dptree::case![Command::HostTemperature].endpoint(handle_host_temperature))
|
||||
.branch(dptree::case![Command::VersionRequest].endpoint(handle_version))
|
||||
.branch(dptree::case![Command::Help].endpoint(handle_help));
|
||||
|
||||
let mut dependencies = DependencyMap::new();
|
||||
dependencies.insert(sqlite_storage);
|
||||
dependencies.insert(climate_client);
|
||||
dependencies.insert(self_temp_client);
|
||||
dependencies.insert(utils::Generators::new());
|
||||
|
||||
info!("running");
|
||||
|
||||
Dispatcher::builder(bot, handler)
|
||||
.dependencies(dependencies)
|
||||
.default_handler(|upd| async move {
|
||||
warn!("unhandled update: {:?}", upd);
|
||||
})
|
||||
.build()
|
||||
.setup_ctrlc_handler()
|
||||
.dispatch()
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
struct Climate {
|
||||
humidity: f32,
|
||||
temp: f32,
|
||||
async fn find_user_mw(msg: Message, storage: repo::SqliteRepo) -> Option<UserDB> {
|
||||
let chat_id = msg.chat.id.0;
|
||||
|
||||
info!("checking if the user with chat_id={chat_id} exists");
|
||||
let user = storage.load_user_by_chat_id(chat_id).await.unwrap();
|
||||
match user {
|
||||
Some(user) => Some(user),
|
||||
None => {
|
||||
let fname = msg.chat.first_name().unwrap_or_default();
|
||||
let lname = msg.chat.last_name().unwrap_or_default();
|
||||
|
||||
info!("performing upsert for user {lname} {fname}");
|
||||
|
||||
let user_db = storage
|
||||
.create_user(chat_id, [fname, lname].join(" "))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("upserted user {user_db:?}");
|
||||
|
||||
Some(user_db)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handler(
|
||||
cx: UpdateWithCx<Message>,
|
||||
command: Command,
|
||||
dsn: String,
|
||||
console_command: String,
|
||||
console_arg: String,
|
||||
startup: std::time::SystemTime,
|
||||
) -> ResponseResult<()> {
|
||||
let request_id = uuid::Uuid::new_v4();
|
||||
fn error_msg(reqid: &utils::RequestID) -> String {
|
||||
format!("There was an error handling command, sorry. Reffer to {reqid}")
|
||||
}
|
||||
|
||||
info!(
|
||||
"incoming request xreqid={} command={:?}",
|
||||
request_id, command
|
||||
);
|
||||
async fn handle_temperature_sensor(
|
||||
bot: AutoSend<Bot>,
|
||||
msg: Message,
|
||||
climate: climate::Client,
|
||||
next_req_id: utils::Generators,
|
||||
user: repo::UserDB,
|
||||
) -> Result<()> {
|
||||
let chat_id = msg.chat.id;
|
||||
let reqid = next_req_id.next_request_id();
|
||||
|
||||
match command {
|
||||
Command::Help => cx.answer(Command::descriptions()).send().await?,
|
||||
Command::HostTemperature => {
|
||||
info!(
|
||||
"querying command {} with arg {}",
|
||||
console_command, console_arg
|
||||
let name = user.name;
|
||||
bot.send_message(chat_id, format!("Just a second, {name}, asking..."))
|
||||
.await?;
|
||||
|
||||
match climate.fetch().await {
|
||||
Ok(temp) => {
|
||||
let text = format!(
|
||||
"Humidity is {} and temperature is {}",
|
||||
temp.humidity, temp.temp,
|
||||
);
|
||||
|
||||
let cmd = std::process::Command::new(&console_command)
|
||||
.arg(&console_arg)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.expect("running vcgencmd command");
|
||||
|
||||
let output = cmd.wait_with_output().expect("waiting for output");
|
||||
|
||||
let parsed =
|
||||
std::string::String::from_utf8(output.stdout).expect("casting into string");
|
||||
|
||||
let parsed = parsed.replace("temp=", "");
|
||||
|
||||
cx.answer_str(format!("Your Raspberry PI temperature is {}", parsed))
|
||||
.await?
|
||||
bot.send_message(chat_id, text).await?;
|
||||
}
|
||||
Command::RoomTemperature => {
|
||||
info!("sending request to {}", dsn);
|
||||
|
||||
let response = match reqwest::get(&dsn).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"unable to handle request xreqid={} error={:?}",
|
||||
request_id, err
|
||||
);
|
||||
cx.answer_str(format!("something went wrong, reference to {}", request_id))
|
||||
.await?;
|
||||
|
||||
return Err(RequestError::NetworkError(err));
|
||||
}
|
||||
};
|
||||
|
||||
let info: Climate = match response.json::<Climate>().await {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"unable to handle request xreqid={} error={:?}",
|
||||
request_id, err
|
||||
);
|
||||
cx.answer_str(format!("something went wrong, reference to {}", request_id))
|
||||
.await?;
|
||||
|
||||
return Err(RequestError::NetworkError(err));
|
||||
}
|
||||
};
|
||||
|
||||
debug!("parsed value: {:?}", info);
|
||||
|
||||
cx.answer_str(format!(
|
||||
"Your room temperature is {:.2} and humidity is {:.2}.",
|
||||
info.temp, info.humidity
|
||||
))
|
||||
.await?
|
||||
}
|
||||
Command::VersionRequest => {
|
||||
cx.answer_str(format!(
|
||||
"app version is {}@{}, uptime is {} second(-s)",
|
||||
VERSION,
|
||||
BRANCH,
|
||||
startup.elapsed().unwrap().as_secs()
|
||||
))
|
||||
.await?
|
||||
Err(err) => {
|
||||
warn!("[{reqid}] unable to fetch self_temp: {err}");
|
||||
let msg = error_msg(&reqid);
|
||||
bot.send_message(chat_id, msg).await?;
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(BotCommand, Debug)]
|
||||
async fn handle_host_temperature(
|
||||
bot: AutoSend<Bot>,
|
||||
msg: Message,
|
||||
temp: SelfTemperature,
|
||||
next_req_id: utils::Generators,
|
||||
) -> Result<()> {
|
||||
let chat_id = msg.chat.id;
|
||||
let reqid = next_req_id.next_request_id();
|
||||
|
||||
bot.send_message(chat_id, "Just a second, asking...")
|
||||
.await?;
|
||||
|
||||
match temp.fetch().await {
|
||||
Ok(temp) => {
|
||||
let text = format!("Host temperature is {} degrees celcius", temp,);
|
||||
|
||||
bot.send_message(chat_id, text).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("[{reqid}] unable to fetch self_temp: {err}");
|
||||
let msg = error_msg(&reqid);
|
||||
bot.send_message(chat_id, msg).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_version(bot: AutoSend<Bot>, msg: Message) -> Result<()> {
|
||||
let chat_id = msg.chat.id;
|
||||
let text = format!("Bot version is {} (branch: {})", VERSION, BRANCH,);
|
||||
|
||||
bot.send_message(chat_id, text).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_help(bot: AutoSend<Bot>, msg: Message) -> Result<()> {
|
||||
let chat_id = msg.chat.id;
|
||||
|
||||
bot.send_message(chat_id, Command::descriptions().to_string())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(BotCommands, Debug, Clone, PartialEq, Eq)]
|
||||
#[command(rename = "lowercase", description = "These commands are supported:")]
|
||||
enum Command {
|
||||
#[command(description = "display this text.")]
|
||||
|
||||
386
src/repo.rs
Normal file
386
src/repo.rs
Normal file
@ -0,0 +1,386 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Result;
|
||||
use log::debug;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous},
|
||||
FromRow, Pool, Sqlite,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SqliteConfig {
|
||||
pub source: String,
|
||||
pub timeout: std::time::Duration,
|
||||
pub max_conns: u32,
|
||||
pub migrate: bool,
|
||||
}
|
||||
|
||||
impl Default for SqliteConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
source: "sqlite::memory:".to_string(),
|
||||
timeout: std::time::Duration::from_secs(10),
|
||||
max_conns: 1,
|
||||
migrate: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteRepo {
|
||||
pool: Pool<Sqlite>,
|
||||
}
|
||||
|
||||
impl SqliteRepo {
|
||||
pub async fn from_config(config: SqliteConfig) -> Result<Self> {
|
||||
let dsn = if !config.source.starts_with("sqlite:") {
|
||||
format!("sqlite:{}", &config.source)
|
||||
} else {
|
||||
config.source
|
||||
};
|
||||
|
||||
debug!("connecting to {}", dsn);
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(&dsn)?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.busy_timeout(config.timeout);
|
||||
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(config.max_conns)
|
||||
.connect_timeout(config.timeout)
|
||||
.connect_with(opts)
|
||||
.await?;
|
||||
|
||||
sqlx::migrate!("./db").run(&pool).await?;
|
||||
|
||||
sqlx::query("pragma temp_store = memory;")
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("pragma mmap_size = 30000000000;")
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("pragma page_size = 4096;")
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromRow, Debug, Clone, PartialEq)]
|
||||
pub struct UserDB {
|
||||
pub user_id: i64,
|
||||
pub chat_id: i64,
|
||||
pub name: String,
|
||||
pub created_at: chrono::NaiveDateTime,
|
||||
}
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct ParameterDB {
|
||||
pub param_id: i64,
|
||||
pub user_id: i64,
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
pub type MappedParameter = std::collections::HashMap<String, ParameterBase>;
|
||||
|
||||
pub struct ParameterBase {
|
||||
pub param_id: i64,
|
||||
pub user_id: i64,
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Storage {
|
||||
async fn create_user(&self, chat_id: i64, name: String) -> Result<UserDB>;
|
||||
async fn get_user(&self, user_id: i64) -> Result<Option<UserDB>>;
|
||||
async fn load_user_by_chat_id(&self, chat_id: i64) -> Result<Option<UserDB>>;
|
||||
async fn get_user_parameters(&self, user_id: i64) -> Result<MappedParameter>;
|
||||
|
||||
async fn upsert_parameter(&self, user_id: i64, key: &str, value: &str) -> Result<ParameterDB>;
|
||||
|
||||
async fn insert_action(&self, user_id: i64, name: &str) -> Result<()>;
|
||||
}
|
||||
|
||||
type SQLResult<T> = sqlx::Result<T>;
|
||||
|
||||
async fn create_user(chat_id: i64, name: String, pool: &Pool<Sqlite>) -> SQLResult<UserDB> {
|
||||
sqlx::query_as(
|
||||
"INSERT INTO `users` (`chat_id`, `name`, `created_at`)
|
||||
VALUES (?, ?, datetime('now'))
|
||||
RETURNING `user_id`, `chat_id`, `name`, `created_at`;",
|
||||
)
|
||||
.bind(chat_id)
|
||||
.bind(name)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
struct FindUserParams {
|
||||
pub(crate) user_id: Option<i64>,
|
||||
pub(crate) chat_id: Option<i64>,
|
||||
}
|
||||
|
||||
impl FindUserParams {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
user_id: None,
|
||||
chat_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_user_id(mut self, user_id: i64) -> Self {
|
||||
self.user_id = Some(user_id);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_chat_id(mut self, chat_id: i64) -> Self {
|
||||
self.chat_id = Some(chat_id);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_user(params: FindUserParams, executor: &Pool<Sqlite>) -> sqlx::Result<UserDB> {
|
||||
let mut qb = sqlx::QueryBuilder::new(
|
||||
"SELECT `user_id`, `chat_id`, `name`, `created_at` FROM `users` WHERE 1=1",
|
||||
);
|
||||
if let Some(user_id) = params.user_id {
|
||||
qb.push(" AND `user_id` = ");
|
||||
qb.push_bind(user_id);
|
||||
};
|
||||
|
||||
if let Some(chat_id) = params.chat_id {
|
||||
qb.push(" AND `chat_id` = ");
|
||||
qb.push_bind(chat_id);
|
||||
}
|
||||
|
||||
let row = qb.build().fetch_one(executor).await?;
|
||||
UserDB::from_row(&row)
|
||||
}
|
||||
|
||||
async fn get_parameters_by_user(user_id: i64, pool: &Pool<Sqlite>) -> SQLResult<MappedParameter> {
|
||||
let mut mp: MappedParameter = std::collections::HashMap::new();
|
||||
sqlx::query_as(
|
||||
"SELECT `param_id`, `user_id`, `key`, `value` FROM parameters WHERE `user_id` = ?",
|
||||
)
|
||||
.bind(user_id)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.for_each(|result: ParameterDB| {
|
||||
let param = ParameterBase {
|
||||
param_id: result.param_id,
|
||||
user_id: result.user_id,
|
||||
value: result.value,
|
||||
};
|
||||
|
||||
mp.insert(result.key, param);
|
||||
});
|
||||
|
||||
Ok(mp)
|
||||
}
|
||||
|
||||
async fn upsert_parameter_for_user(
|
||||
user_id: i64,
|
||||
key: &str,
|
||||
value: &str,
|
||||
pool: &Pool<Sqlite>,
|
||||
) -> SQLResult<ParameterDB> {
|
||||
sqlx::query_as(
|
||||
"INSERT INTO parameters (`user_id`, `key`, `value`)
|
||||
VALUES (?, ?, ?)
|
||||
RETURNING `param_id`, `user_id`, `key`, `value`;",
|
||||
)
|
||||
.bind(user_id)
|
||||
.bind(key)
|
||||
.bind(value)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn insert_user_action(user_id: i64, name: &str, pool: &Pool<Sqlite>) -> Result<()> {
|
||||
sqlx::query("INSERT INTO actions (`user_id`, `name`) VALUES (?, ?)")
|
||||
.bind(user_id)
|
||||
.bind(name)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn subscriber_user<S>(
|
||||
user_id: i64,
|
||||
kind: &str,
|
||||
args: Option<&S>,
|
||||
executor: &Pool<Sqlite>,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: serde::Serialize,
|
||||
{
|
||||
let args_out = match args {
|
||||
Some(args) => Some(serde_json::to_string(args)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
sqlx::query("INSERT INTO subscribers (`user_id`, `kind`, `args`) VALUES (?, ?, ?)")
|
||||
.bind(user_id)
|
||||
.bind(kind)
|
||||
.bind(args_out)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn unsubscribe_user(user_id: i64, kind: &str, executor: &Pool<Sqlite>) -> Result<()> {
|
||||
sqlx::query("DELETE FROM subscribers WHERE `user_id` = ? AND `kind` = ?")
|
||||
.bind(user_id)
|
||||
.bind(kind)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(FromRow, Debug)]
|
||||
pub struct SubscriptionDB {
|
||||
pub subscribe_id: i64,
|
||||
pub user_id: i64,
|
||||
pub kind: String,
|
||||
pub args: String,
|
||||
}
|
||||
|
||||
async fn find_subscribers_by_kind(
|
||||
kind: &str,
|
||||
executor: &Pool<Sqlite>,
|
||||
) -> Result<Vec<SubscriptionDB>> {
|
||||
Ok(sqlx::query_as( "SELECT `subscriber_id`, `user_id`, `kind`, `arguments` FROM `subcribers` WHERE `kind` = ?")
|
||||
.bind(kind)
|
||||
.fetch_all(executor)
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Storage for SqliteRepo {
|
||||
async fn create_user(&self, chat_id: i64, name: String) -> Result<UserDB> {
|
||||
Ok(create_user(chat_id, name, &self.pool).await?)
|
||||
}
|
||||
|
||||
async fn load_user_by_chat_id(&self, chat_id: i64) -> Result<Option<UserDB>> {
|
||||
let params = FindUserParams::new().with_chat_id(chat_id);
|
||||
match find_user(params, &self.pool).await {
|
||||
Ok(row) => Ok(Some(row)),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
err => Err(anyhow::anyhow!(err)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_user(&self, user_id: i64) -> Result<Option<UserDB>> {
|
||||
let params = FindUserParams::new().with_user_id(user_id);
|
||||
match find_user(params, &self.pool).await {
|
||||
Ok(row) => Ok(Some(row)),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
err => Err(anyhow::anyhow!(err)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_user_parameters(&self, user_id: i64) -> Result<MappedParameter> {
|
||||
Ok(get_parameters_by_user(user_id, &self.pool).await?)
|
||||
}
|
||||
|
||||
async fn upsert_parameter(&self, user_id: i64, key: &str, value: &str) -> Result<ParameterDB> {
|
||||
Ok(upsert_parameter_for_user(user_id, key, value, &self.pool).await?)
|
||||
}
|
||||
|
||||
async fn insert_action(&self, user_id: i64, name: &str) -> Result<()> {
|
||||
insert_user_action(user_id, name, &self.pool).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Once;
|
||||
|
||||
static ONCE: Once = Once::new();
|
||||
|
||||
async fn prepare() -> Result<Pool<Sqlite>> {
|
||||
const DSN: &str = "sqlite::memory:";
|
||||
|
||||
ONCE.call_once(|| {
|
||||
std::env::set_var("RUST_LOG", "debug");
|
||||
env_logger::init();
|
||||
});
|
||||
|
||||
let pool = SqlitePoolOptions::new().connect(DSN).await?;
|
||||
sqlx::migrate!("./db").run(&pool).await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO `users` (user_id, chat_id, name, created_at)
|
||||
VALUES (1, 100, 'Alex', datetime('now'));",
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_get_user_by_chat_id() {
|
||||
let executor = prepare().await.expect("should prepare store");
|
||||
let params = FindUserParams::new().with_chat_id(100);
|
||||
let user = find_user(params, &executor)
|
||||
.await
|
||||
.expect("should found user");
|
||||
|
||||
let exp_user = UserDB {
|
||||
user_id: 1,
|
||||
chat_id: 100,
|
||||
name: "Alex".to_string(),
|
||||
created_at: user.created_at,
|
||||
};
|
||||
assert_eq!(exp_user, user);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_get_user_by_user_id() {
|
||||
let executor = prepare().await.expect("should prepare store");
|
||||
let params = FindUserParams::new().with_user_id(1);
|
||||
let user = find_user(params, &executor)
|
||||
.await
|
||||
.expect("should found user");
|
||||
|
||||
let exp_user = UserDB {
|
||||
user_id: 1,
|
||||
chat_id: 100,
|
||||
name: "Alex".to_string(),
|
||||
created_at: user.created_at,
|
||||
};
|
||||
assert_eq!(exp_user, user);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_create_user() {
|
||||
let pool = prepare().await.expect("should prepare store");
|
||||
let user = create_user(101, "Phew".to_owned(), &pool)
|
||||
.await
|
||||
.expect("should create user");
|
||||
|
||||
let exp_user = UserDB {
|
||||
user_id: user.user_id,
|
||||
chat_id: 101,
|
||||
name: "Phew".to_string(),
|
||||
created_at: user.created_at,
|
||||
};
|
||||
assert_eq!(exp_user, user);
|
||||
}
|
||||
}
|
||||
25
src/utils.rs
Normal file
25
src/utils.rs
Normal file
@ -0,0 +1,25 @@
|
||||
pub struct RequestID(String);
|
||||
|
||||
impl std::fmt::Display for RequestID {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Generators;
|
||||
|
||||
impl Generators {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn next_request_id(&self) -> RequestID {
|
||||
let data: u32 = rand::random();
|
||||
let data_hex = format!("{:x}", data);
|
||||
|
||||
log::info!("issued new request: {}", data_hex);
|
||||
|
||||
RequestID(data_hex)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user