Compare commits

...

4 Commits

6 changed files with 188 additions and 48 deletions

View File

@ -50,10 +50,12 @@ async fn socket_activate(config: ServerConfig) -> anyhow::Result<()> {
// TODO: allow getting socket path from other socket activation sources // TODO: allow getting socket path from other socket activation sources
let conn = get_socket_from_systemd().await?; let conn = get_socket_from_systemd().await?;
let uid = conn.peer_cred()?.uid(); let uid = conn.peer_cred()?.uid();
let unix_user = UnixUser::from_uid(uid.into())?; let unix_user = UnixUser::from_uid(uid)?;
log::info!("Accepted connection from {}", unix_user.username); log::info!("Accepted connection from {}", unix_user.username);
sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).ok();
handle_requests_for_single_session(conn, &unix_user, &config).await?; handle_requests_for_single_session(conn, &unix_user, &config).await?;
Ok(()) Ok(())

View File

@ -102,8 +102,11 @@ pub async fn create_mysql_connection_from_config(
config: &MysqlConfig, config: &MysqlConfig,
) -> anyhow::Result<MySqlConnection> { ) -> anyhow::Result<MySqlConnection> {
let mut display_config = config.clone(); let mut display_config = config.clone();
display_config.password = "<REDACTED>".to_owned(); "<REDACTED>".clone_into(&mut display_config.password);
log::debug!("Connecting to MySQL server with parameters: {:#?}", display_config); log::debug!(
"Connecting to MySQL server with parameters: {:#?}",
display_config
);
match tokio::time::timeout( match tokio::time::timeout(
Duration::from_secs(config.timeout.unwrap_or(DEFAULT_TIMEOUT)), Duration::from_secs(config.timeout.unwrap_or(DEFAULT_TIMEOUT)),

View File

@ -57,11 +57,13 @@ pub async fn listen_for_incoming_connections(
let listener = UnixListener::bind(socket_path)?; let listener = UnixListener::bind(socket_path)?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).ok();
while let Ok((mut conn, _addr)) = listener.accept().await { while let Ok((mut conn, _addr)) = listener.accept().await {
let uid = conn.peer_cred()?.uid(); let uid = conn.peer_cred()?.uid();
log::trace!("Accepted connection from uid {}", uid); log::trace!("Accepted connection from uid {}", uid);
let unix_user = match UnixUser::from_uid(uid.into()) { let unix_user = match UnixUser::from_uid(uid) {
Ok(user) => user, Ok(user) => user,
Err(e) => { Err(e) => {
eprintln!("Failed to get UnixUser from uid: {}", e); eprintln!("Failed to get UnixUser from uid: {}", e);

View File

@ -26,9 +26,17 @@ pub(super) async fn unsafe_database_exists(
sqlx::query("SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?") sqlx::query("SELECT SCHEMA_NAME FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?")
.bind(database_name) .bind(database_name)
.fetch_optional(connection) .fetch_optional(connection)
.await?; .await;
Ok(result.is_some()) if let Err(err) = &result {
log::error!(
"Failed to check if database '{}' exists: {:?}",
&database_name,
err
);
}
Ok(result?.is_some())
} }
pub async fn create_databases( pub async fn create_databases(
@ -80,6 +88,10 @@ pub async fn create_databases(
.map(|_| ()) .map(|_| ())
.map_err(|err| CreateDatabaseError::MySqlError(err.to_string())); .map_err(|err| CreateDatabaseError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to create database '{}': {:?}", &database_name, err);
}
results.insert(database_name, result); results.insert(database_name, result);
} }
@ -135,6 +147,10 @@ pub async fn drop_databases(
.map(|_| ()) .map(|_| ())
.map_err(|err| DropDatabaseError::MySqlError(err.to_string())); .map_err(|err| DropDatabaseError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to drop database '{}': {:?}", &database_name, err);
}
results.insert(database_name, result); results.insert(database_name, result);
} }
@ -145,7 +161,7 @@ pub async fn list_databases_for_user(
unix_user: &UnixUser, unix_user: &UnixUser,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<Vec<String>, ListDatabasesError> { ) -> Result<Vec<String>, ListDatabasesError> {
sqlx::query( let result = sqlx::query(
r#" r#"
SELECT `SCHEMA_NAME` AS `database` SELECT `SCHEMA_NAME` AS `database`
FROM `information_schema`.`SCHEMATA` FROM `information_schema`.`SCHEMATA`
@ -161,5 +177,15 @@ pub async fn list_databases_for_user(
.map(|row| row.try_get::<String, _>("database")) .map(|row| row.try_get::<String, _>("database"))
.collect::<Result<Vec<String>, sqlx::Error>>() .collect::<Result<Vec<String>, sqlx::Error>>()
}) })
.map_err(|err| ListDatabasesError::MySqlError(err.to_string())) .map_err(|err| ListDatabasesError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!(
"Failed to list databases for user '{}': {:?}",
unix_user.username,
err
);
}
result
} }

View File

@ -136,7 +136,7 @@ async fn unsafe_get_database_privileges(
database_name: &str, database_name: &str,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<Vec<DatabasePrivilegeRow>, sqlx::Error> { ) -> Result<Vec<DatabasePrivilegeRow>, sqlx::Error> {
sqlx::query_as::<_, DatabasePrivilegeRow>(&format!( let result = sqlx::query_as::<_, DatabasePrivilegeRow>(&format!(
"SELECT {} FROM `db` WHERE `db` = ?", "SELECT {} FROM `db` WHERE `db` = ?",
DATABASE_PRIVILEGE_FIELDS DATABASE_PRIVILEGE_FIELDS
.iter() .iter()
@ -145,7 +145,17 @@ async fn unsafe_get_database_privileges(
)) ))
.bind(database_name) .bind(database_name)
.fetch_all(connection) .fetch_all(connection)
.await .await;
if let Err(e) = &result {
log::error!(
"Failed to get database privileges for '{}': {}",
&database_name,
e
);
}
result
} }
// NOTE: this function is unsafe because it does no input validation. // NOTE: this function is unsafe because it does no input validation.
@ -155,7 +165,7 @@ pub async fn unsafe_get_database_privileges_for_db_user_pair(
user_name: &str, user_name: &str,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<Option<DatabasePrivilegeRow>, sqlx::Error> { ) -> Result<Option<DatabasePrivilegeRow>, sqlx::Error> {
sqlx::query_as::<_, DatabasePrivilegeRow>(&format!( let result = sqlx::query_as::<_, DatabasePrivilegeRow>(&format!(
"SELECT {} FROM `db` WHERE `db` = ? AND `user` = ?", "SELECT {} FROM `db` WHERE `db` = ? AND `user` = ?",
DATABASE_PRIVILEGE_FIELDS DATABASE_PRIVILEGE_FIELDS
.iter() .iter()
@ -165,7 +175,18 @@ pub async fn unsafe_get_database_privileges_for_db_user_pair(
.bind(database_name) .bind(database_name)
.bind(user_name) .bind(user_name)
.fetch_optional(connection) .fetch_optional(connection)
.await .await;
if let Err(e) = &result {
log::error!(
"Failed to get database privileges for '{}.{}': {}",
&database_name,
&user_name,
e
);
}
result
} }
pub async fn get_databases_privilege_data( pub async fn get_databases_privilege_data(
@ -220,7 +241,7 @@ pub async fn get_all_database_privileges(
unix_user: &UnixUser, unix_user: &UnixUser,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> GetAllDatabasesPrivilegeData { ) -> GetAllDatabasesPrivilegeData {
sqlx::query_as::<_, DatabasePrivilegeRow>(&format!( let result = sqlx::query_as::<_, DatabasePrivilegeRow>(&format!(
indoc! {r#" indoc! {r#"
SELECT {} FROM `db` WHERE `db` IN SELECT {} FROM `db` WHERE `db` IN
(SELECT DISTINCT `SCHEMA_NAME` AS `database` (SELECT DISTINCT `SCHEMA_NAME` AS `database`
@ -236,14 +257,20 @@ pub async fn get_all_database_privileges(
.bind(create_user_group_matching_regex(unix_user)) .bind(create_user_group_matching_regex(unix_user))
.fetch_all(connection) .fetch_all(connection)
.await .await
.map_err(|e| GetAllDatabasesPrivilegeDataError::MySqlError(e.to_string())) .map_err(|e| GetAllDatabasesPrivilegeDataError::MySqlError(e.to_string()));
if let Err(e) = &result {
log::error!("Failed to get all database privileges: {:?}", e);
}
result
} }
async fn unsafe_apply_privilege_diff( async fn unsafe_apply_privilege_diff(
database_privilege_diff: &DatabasePrivilegesDiff, database_privilege_diff: &DatabasePrivilegesDiff,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<(), sqlx::Error> { ) -> Result<(), sqlx::Error> {
match database_privilege_diff { let result = match database_privilege_diff {
DatabasePrivilegesDiff::New(p) => { DatabasePrivilegesDiff::New(p) => {
let tables = DATABASE_PRIVILEGE_FIELDS let tables = DATABASE_PRIVILEGE_FIELDS
.iter() .iter()
@ -305,7 +332,13 @@ async fn unsafe_apply_privilege_diff(
.await .await
.map(|_| ()) .map(|_| ())
} }
};
if let Err(e) = &result {
log::error!("Failed to apply database privilege diff: {}", e);
} }
result
} }
async fn validate_diff( async fn validate_diff(

View File

@ -1,6 +1,6 @@
use indoc::formatdoc;
use itertools::Itertools; use itertools::Itertools;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use indoc::formatdoc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -29,7 +29,7 @@ async fn unsafe_user_exists(
db_user: &str, db_user: &str,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<bool, sqlx::Error> { ) -> Result<bool, sqlx::Error> {
sqlx::query( let result = sqlx::query(
r#" r#"
SELECT EXISTS( SELECT EXISTS(
SELECT 1 SELECT 1
@ -41,7 +41,13 @@ async fn unsafe_user_exists(
.bind(db_user) .bind(db_user)
.fetch_one(connection) .fetch_one(connection)
.await .await
.map(|row| row.get::<bool, _>(0)) .map(|row| row.get::<bool, _>(0));
if let Err(err) = &result {
log::error!("Failed to check if database user exists: {:?}", err);
}
result
} }
pub async fn create_database_users( pub async fn create_database_users(
@ -80,6 +86,10 @@ pub async fn create_database_users(
.map(|_| ()) .map(|_| ())
.map_err(|err| CreateUserError::MySqlError(err.to_string())); .map_err(|err| CreateUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to create database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result); results.insert(db_user, result);
} }
@ -122,6 +132,10 @@ pub async fn drop_database_users(
.map(|_| ()) .map(|_| ())
.map_err(|err| DropUserError::MySqlError(err.to_string())); .map_err(|err| DropUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to drop database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result); results.insert(db_user, result);
} }
@ -148,7 +162,7 @@ pub async fn set_password_for_database_user(
_ => {} _ => {}
} }
sqlx::query( let result = sqlx::query(
format!( format!(
"ALTER USER {}@'%' IDENTIFIED BY {}", "ALTER USER {}@'%' IDENTIFIED BY {}",
quote_literal(db_user), quote_literal(db_user),
@ -159,7 +173,17 @@ pub async fn set_password_for_database_user(
.execute(&mut *connection) .execute(&mut *connection)
.await .await
.map(|_| ()) .map(|_| ())
.map_err(|err| SetPasswordError::MySqlError(err.to_string())) .map_err(|err| SetPasswordError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!(
"Failed to set password for database user '{}': {:?}",
&db_user,
err
);
}
result
} }
// NOTE: this function is unsafe because it does no input validation. // NOTE: this function is unsafe because it does no input validation.
@ -167,7 +191,7 @@ async fn database_user_is_locked_unsafe(
db_user: &str, db_user: &str,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> Result<bool, sqlx::Error> { ) -> Result<bool, sqlx::Error> {
sqlx::query( let result = sqlx::query(
r#" r#"
SELECT COALESCE( SELECT COALESCE(
JSON_EXTRACT(`mysql`.`global_priv`.`priv`, "$.account_locked"), JSON_EXTRACT(`mysql`.`global_priv`.`priv`, "$.account_locked"),
@ -181,7 +205,17 @@ async fn database_user_is_locked_unsafe(
.bind(db_user) .bind(db_user)
.fetch_one(connection) .fetch_one(connection)
.await .await
.map(|row| row.get::<bool, _>(0)) .map(|row| row.get::<bool, _>(0));
if let Err(err) = &result {
log::error!(
"Failed to check if database user is locked '{}': {:?}",
&db_user,
err
);
}
result
} }
pub async fn lock_database_users( pub async fn lock_database_users(
@ -234,6 +268,10 @@ pub async fn lock_database_users(
.map(|_| ()) .map(|_| ())
.map_err(|err| LockUserError::MySqlError(err.to_string())); .map_err(|err| LockUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to lock database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result); results.insert(db_user, result);
} }
@ -290,6 +328,10 @@ pub async fn unlock_database_users(
.map(|_| ()) .map(|_| ())
.map_err(|err| UnlockUserError::MySqlError(err.to_string())); .map_err(|err| UnlockUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to unlock database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result); results.insert(db_user, result);
} }
@ -298,39 +340,55 @@ pub async fn unlock_database_users(
/// This struct contains information about a database user. /// This struct contains information about a database user.
/// This can be extended if we need more information in the future. /// This can be extended if we need more information in the future.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, FromRow)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DatabaseUser { pub struct DatabaseUser {
#[sqlx(rename = "User")]
pub user: String, pub user: String,
#[allow(dead_code)]
#[serde(skip)] #[serde(skip)]
#[sqlx(rename = "Host")]
pub host: String, pub host: String,
#[sqlx(rename = "has_password")]
pub has_password: bool, pub has_password: bool,
#[sqlx(rename = "is_locked")]
pub is_locked: bool, pub is_locked: bool,
#[sqlx(skip)]
pub databases: Vec<String>, pub databases: Vec<String>,
} }
/// Some mysql versions with some collations mark some columns as binary fields,
/// which in the current version of sqlx is not parsable as string.
/// See: https://github.com/launchbadge/sqlx/issues/3387
#[inline]
fn try_get_with_binary_fallback(
row: &sqlx::mysql::MySqlRow,
column: &str,
) -> Result<String, sqlx::Error> {
row.try_get(column).or_else(|_| {
row.try_get::<Vec<u8>, _>(column)
.map(|v| String::from_utf8_lossy(&v).to_string())
})
}
impl FromRow<'_, sqlx::mysql::MySqlRow> for DatabaseUser {
fn from_row(row: &sqlx::mysql::MySqlRow) -> Result<Self, sqlx::Error> {
Ok(Self {
user: try_get_with_binary_fallback(row, "User")?,
host: try_get_with_binary_fallback(row, "Host")?,
has_password: row.try_get("has_password")?,
is_locked: row.try_get("is_locked")?,
databases: Vec::new(),
})
}
}
const DB_USER_SELECT_STATEMENT: &str = r#" const DB_USER_SELECT_STATEMENT: &str = r#"
SELECT SELECT
`mysql`.`user`.`User`, `user`.`User`,
`mysql`.`user`.`Host`, `user`.`Host`,
`mysql`.`user`.`Password` != '' OR `mysql`.`user`.`authentication_string` != '' AS `has_password`, `user`.`Password` != '' OR `user`.`authentication_string` != '' AS `has_password`,
COALESCE( COALESCE(
JSON_EXTRACT(`mysql`.`global_priv`.`priv`, "$.account_locked"), JSON_EXTRACT(`global_priv`.`priv`, "$.account_locked"),
'false' 'false'
) != 'false' AS `is_locked` ) != 'false' AS `is_locked`
FROM `mysql`.`user` FROM `user`
JOIN `mysql`.`global_priv` ON JOIN `global_priv` ON
`mysql`.`user`.`User` = `mysql`.`global_priv`.`User` `user`.`User` = `global_priv`.`User`
AND `mysql`.`user`.`Host` = `mysql`.`global_priv`.`Host` AND `user`.`Host` = `global_priv`.`Host`
"#; "#;
pub async fn list_database_users( pub async fn list_database_users(
@ -358,6 +416,10 @@ pub async fn list_database_users(
.fetch_optional(&mut *connection) .fetch_optional(&mut *connection)
.await; .await;
if let Err(err) = &result {
log::error!("Failed to list database user '{}': {:?}", &db_user, err);
}
if let Ok(Some(user)) = result.as_mut() { if let Ok(Some(user)) = result.as_mut() {
append_databases_where_user_has_privileges(user, &mut *connection).await; append_databases_where_user_has_privileges(user, &mut *connection).await;
} }
@ -377,13 +439,17 @@ pub async fn list_all_database_users_for_unix_user(
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) -> ListAllUsersOutput { ) -> ListAllUsersOutput {
let mut result = sqlx::query_as::<_, DatabaseUser>( let mut result = sqlx::query_as::<_, DatabaseUser>(
&(DB_USER_SELECT_STATEMENT.to_string() + "WHERE `mysql`.`user`.`User` REGEXP ?"), &(DB_USER_SELECT_STATEMENT.to_string() + "WHERE `user`.`User` REGEXP ?"),
) )
.bind(create_user_group_matching_regex(unix_user)) .bind(create_user_group_matching_regex(unix_user))
.fetch_all(&mut *connection) .fetch_all(&mut *connection)
.await .await
.map_err(|err| ListAllUsersError::MySqlError(err.to_string())); .map_err(|err| ListAllUsersError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to list all database users: {:?}", err);
}
if let Ok(users) = result.as_mut() { if let Ok(users) = result.as_mut() {
for user in users { for user in users {
append_databases_where_user_has_privileges(user, &mut *connection).await; append_databases_where_user_has_privileges(user, &mut *connection).await;
@ -394,15 +460,15 @@ pub async fn list_all_database_users_for_unix_user(
} }
pub async fn append_databases_where_user_has_privileges( pub async fn append_databases_where_user_has_privileges(
database_user: &mut DatabaseUser, db_user: &mut DatabaseUser,
connection: &mut MySqlConnection, connection: &mut MySqlConnection,
) { ) {
let database_list = sqlx::query( let database_list = sqlx::query(
formatdoc!( formatdoc!(
r#" r#"
SELECT `db` AS `database` SELECT `Db` AS `database`
FROM `db` FROM `db`
WHERE `user` = ? AND ({}) WHERE `User` = ? AND ({})
"#, "#,
DATABASE_PRIVILEGE_FIELDS DATABASE_PRIVILEGE_FIELDS
.iter() .iter()
@ -411,14 +477,22 @@ pub async fn append_databases_where_user_has_privileges(
) )
.as_str(), .as_str(),
) )
.bind(database_user.user.clone()) .bind(db_user.user.clone())
.fetch_all(&mut *connection) .fetch_all(&mut *connection)
.await; .await;
database_user.databases = database_list if let Err(err) = &database_list {
log::error!(
"Failed to list databases for user '{}': {:?}",
&db_user.user,
err
);
}
db_user.databases = database_list
.map(|rows| { .map(|rows| {
rows.into_iter() rows.into_iter()
.map(|row| row.get::<String, _>("database")) .map(|row| try_get_with_binary_fallback(&row, "database").unwrap())
.collect() .collect()
}) })
.unwrap_or_default(); .unwrap_or_default();