Switch from log to tracing

This commit is contained in:
2025-11-30 20:42:10 +09:00
parent 7f5c3310db
commit 2472936857
11 changed files with 217 additions and 192 deletions

View File

@@ -3,7 +3,7 @@ use std::path::PathBuf;
use anyhow::Context;
use clap::Parser;
use clap_verbosity_flag::Verbosity;
use systemd_journal_logger::JournalLog;
use tracing_subscriber::prelude::*;
use crate::{core::common::DEFAULT_CONFIG_PATH, server::supervisor::Supervisor};
@@ -50,29 +50,39 @@ pub async fn handle_command(
false
}
};
if systemd_mode {
JournalLog::new()
.context("Failed to initialize journald logging")?
.install()
.context("Failed to install journald logger")?;
let subscriber = tracing_subscriber::Registry::default()
.with(verbosity.tracing_level_filter())
.with(tracing_journald::layer()?);
log::set_max_level(verbosity.log_level_filter());
tracing::subscriber::set_global_default(subscriber)
.context("Failed to set global default tracing subscriber")?;
if verbosity.log_level_filter() >= log::LevelFilter::Trace {
log::warn!("{}", LOG_LEVEL_WARNING.trim());
if verbosity.tracing_level_filter() >= tracing::Level::TRACE {
tracing::warn!("{}", LOG_LEVEL_WARNING.trim());
}
if auto_detected_systemd_mode {
log::info!("Running in systemd mode, auto-detected");
tracing::info!("Running in systemd mode, auto-detected");
} else {
log::info!("Running in systemd mode");
tracing::info!("Running in systemd mode");
}
} else {
env_logger::Builder::new()
.filter_level(verbosity.log_level_filter())
.init();
let subscriber = tracing_subscriber::Registry::default()
.with(verbosity.tracing_level_filter())
.with(
tracing_subscriber::fmt::layer()
.with_line_number(cfg!(debug_assertions))
.with_target(cfg!(debug_assertions))
.with_thread_ids(false)
.with_thread_names(false),
);
log::info!("Running in standalone mode");
tracing::subscriber::set_global_default(subscriber)
.context("Failed to set global default tracing subscriber")?;
tracing::info!("Running in standalone mode");
}
let config_path = config_path.unwrap_or_else(|| PathBuf::from(DEFAULT_CONFIG_PATH));

View File

@@ -35,7 +35,7 @@ impl MysqlConfig {
pub fn as_mysql_connect_options(&self) -> anyhow::Result<MySqlConnectOptions> {
let mut options = MySqlConnectOptions::new()
.database("mysql")
.log_statements(log::LevelFilter::Trace);
.log_statements(tracing::log::LevelFilter::Trace);
if let Some(username) = &self.username {
options = options.username(username);
@@ -63,7 +63,7 @@ impl MysqlConfig {
.password
.as_ref()
.map(|_| "<REDACTED>".to_owned());
log::debug!(
tracing::debug!(
"Connecting to MySQL server with parameters: {:#?}",
display_config
);
@@ -79,7 +79,7 @@ pub struct ServerConfig {
impl ServerConfig {
/// Reads the server configuration from the specified path, or the default path if none is provided.
pub fn read_config_from_path(config_path: &Path) -> anyhow::Result<Self> {
log::debug!("Reading config file at {:?}", config_path);
tracing::debug!("Reading config file at {:?}", config_path);
fs::read_to_string(config_path)
.context(format!("Failed to read config file at {:?}", config_path))

View File

@@ -40,7 +40,7 @@ pub async fn session_handler(
let uid = match socket.peer_cred() {
Ok(cred) => cred.uid(),
Err(e) => {
log::error!("Failed to get peer credentials from socket: {}", e);
tracing::error!("Failed to get peer credentials from socket: {}", e);
let mut message_stream = create_server_to_client_message_stream(socket);
message_stream
.send(Response::Error(
@@ -56,12 +56,12 @@ pub async fn session_handler(
}
};
log::debug!("Validated peer UID: {}", uid);
tracing::debug!("Validated peer UID: {}", uid);
let unix_user = match UnixUser::from_uid(uid) {
Ok(user) => user,
Err(e) => {
log::error!("Failed to get username from uid: {}", e);
tracing::error!("Failed to get username from uid: {}", e);
let mut message_stream = create_server_to_client_message_stream(socket);
message_stream
.send(Response::Error(
@@ -87,7 +87,7 @@ pub async fn session_handler_with_unix_user(
) -> anyhow::Result<()> {
let mut message_stream = create_server_to_client_message_stream(socket);
log::debug!("Requesting database connection from pool");
tracing::debug!("Requesting database connection from pool");
let mut db_connection = match db_pool.read().await.acquire().await {
Ok(connection) => connection,
Err(err) => {
@@ -104,12 +104,12 @@ pub async fn session_handler_with_unix_user(
return Err(err.into());
}
};
log::debug!("Successfully acquired database connection from pool");
tracing::debug!("Successfully acquired database connection from pool");
let result =
session_handler_with_db_connection(message_stream, unix_user, &mut db_connection).await;
log::debug!("Releasing database connection back to pool");
tracing::debug!("Releasing database connection back to pool");
result
}
@@ -131,7 +131,7 @@ async fn session_handler_with_db_connection(
Some(Ok(request)) => request,
Some(Err(e)) => return Err(e.into()),
None => {
log::warn!("Client disconnected without sending an exit message");
tracing::warn!("Client disconnected without sending an exit message");
break;
}
};
@@ -143,7 +143,7 @@ async fn session_handler_with_db_connection(
}
request => request.to_owned(),
};
log::info!("Received request: {:#?}", request_to_display);
tracing::info!("Received request: {:#?}", request_to_display);
let response = match request {
Request::CheckAuthorization(dbs_or_users) => {
@@ -237,11 +237,11 @@ async fn session_handler_with_db_connection(
}
response => response.to_owned(),
};
log::info!("Response: {:#?}", response_to_display);
tracing::info!("Response: {:#?}", response_to_display);
stream.send(response).await?;
stream.flush().await?;
log::debug!("Successfully processed request");
tracing::debug!("Successfully processed request");
}
Ok(())

View File

@@ -33,7 +33,7 @@ pub(super) async fn unsafe_database_exists(
.await;
if let Err(err) = &result {
log::error!(
tracing::error!(
"Failed to check if database '{}' exists: {:?}",
&database_name,
err
@@ -93,7 +93,7 @@ pub async fn create_databases(
.map_err(|err| CreateDatabaseError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to create database '{}': {:?}", &database_name, err);
tracing::error!("Failed to create database '{}': {:?}", &database_name, err);
}
results.insert(database_name, result);
@@ -152,7 +152,7 @@ pub async fn drop_databases(
.map_err(|err| DropDatabaseError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to drop database '{}': {:?}", &database_name, err);
tracing::error!("Failed to drop database '{}': {:?}", &database_name, err);
}
results.insert(database_name, result);
@@ -216,7 +216,7 @@ pub async fn list_databases(
});
if let Err(err) = &result {
log::error!("Failed to list database '{}': {:?}", &database_name, err);
tracing::error!("Failed to list database '{}': {:?}", &database_name, err);
}
results.insert(database_name, result);
@@ -243,7 +243,7 @@ pub async fn list_all_databases_for_user(
.map_err(|err| ListAllDatabasesError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!(
tracing::error!(
"Failed to list databases for user '{}': {:?}",
unix_user.username,
err

View File

@@ -50,7 +50,7 @@ fn get_mysql_row_priv_field(row: &MySqlRow, position: usize) -> Result<bool, sql
match rev_yn(value) {
Some(val) => Ok(val),
_ => {
log::warn!(r#"Invalid value for privilege "{}": '{}'"#, field, value);
tracing::warn!(r#"Invalid value for privilege "{}": '{}'"#, field, value);
Ok(false)
}
}
@@ -94,7 +94,7 @@ async fn unsafe_get_database_privileges(
.await;
if let Err(e) = &result {
log::error!(
tracing::error!(
"Failed to get database privileges for '{}': {}",
&database_name,
e
@@ -124,7 +124,7 @@ pub async fn unsafe_get_database_privileges_for_db_user_pair(
.await;
if let Err(e) = &result {
log::error!(
tracing::error!(
"Failed to get database privileges for '{}.{}': {}",
&database_name,
&user_name,
@@ -206,7 +206,7 @@ pub async fn get_all_database_privileges(
.map_err(|e| GetAllDatabasesPrivilegeDataError::MySqlError(e.to_string()));
if let Err(e) = &result {
log::error!("Failed to get all database privileges: {:?}", e);
tracing::error!("Failed to get all database privileges: {:?}", e);
}
result
@@ -298,7 +298,7 @@ async fn unsafe_apply_privilege_diff(
};
if let Err(e) = &result {
log::error!("Failed to apply database privilege diff: {}", e);
tracing::error!("Failed to apply database privilege diff: {}", e);
}
result
@@ -380,7 +380,7 @@ async fn validate_diff(
}
}
DatabasePrivilegesDiff::Noop { .. } => {
log::warn!(
tracing::warn!(
"Server got sent a noop database privilege diff to validate, is the client buggy?"
);
Ok(())

View File

@@ -45,7 +45,7 @@ async fn unsafe_user_exists(
.map(|row| row.get::<bool, _>(0));
if let Err(err) = &result {
log::error!("Failed to check if database user exists: {:?}", err);
tracing::error!("Failed to check if database user exists: {:?}", err);
}
result
@@ -88,7 +88,7 @@ pub async fn create_database_users(
.map_err(|err| CreateUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to create database user '{}': {:?}", &db_user, err);
tracing::error!("Failed to create database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result);
@@ -134,7 +134,7 @@ pub async fn drop_database_users(
.map_err(|err| DropUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to drop database user '{}': {:?}", &db_user, err);
tracing::error!("Failed to drop database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result);
@@ -177,7 +177,7 @@ pub async fn set_password_for_database_user(
.map_err(|err| SetPasswordError::MySqlError(err.to_string()));
if result.is_err() {
log::error!(
tracing::error!(
"Failed to set password for database user '{}': <REDACTED>",
&db_user,
);
@@ -208,7 +208,7 @@ async fn database_user_is_locked_unsafe(
.map(|row| row.get::<bool, _>(0));
if let Err(err) = &result {
log::error!(
tracing::error!(
"Failed to check if database user is locked '{}': {:?}",
&db_user,
err
@@ -269,7 +269,7 @@ pub async fn lock_database_users(
.map_err(|err| LockUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to lock database user '{}': {:?}", &db_user, err);
tracing::error!("Failed to lock database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result);
@@ -329,7 +329,7 @@ pub async fn unlock_database_users(
.map_err(|err| UnlockUserError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to unlock database user '{}': {:?}", &db_user, err);
tracing::error!("Failed to unlock database user '{}': {:?}", &db_user, err);
}
results.insert(db_user, result);
@@ -403,7 +403,7 @@ pub async fn list_database_users(
.await;
if let Err(err) = &result {
log::error!("Failed to list database user '{}': {:?}", &db_user, err);
tracing::error!("Failed to list database user '{}': {:?}", &db_user, err);
}
if let Ok(Some(user)) = result.as_mut() {
@@ -433,7 +433,7 @@ pub async fn list_all_database_users_for_unix_user(
.map_err(|err| ListAllUsersError::MySqlError(err.to_string()));
if let Err(err) = &result {
log::error!("Failed to list all database users: {:?}", err);
tracing::error!("Failed to list all database users: {:?}", err);
}
if let Ok(users) = result.as_mut() {
@@ -468,7 +468,7 @@ pub async fn append_databases_where_user_has_privileges(
.await;
if let Err(err) = &database_list {
log::error!(
tracing::error!(
"Failed to list databases for user '{}': {:?}",
&db_user.user,
err

View File

@@ -56,8 +56,8 @@ pub struct Supervisor {
impl Supervisor {
pub async fn new(config_path: PathBuf, systemd_mode: bool) -> anyhow::Result<Self> {
log::debug!("Starting server supervisor");
log::debug!(
tracing::debug!("Starting server supervisor");
tracing::debug!(
"Running in tokio with {} worker threads",
tokio::runtime::Handle::current().metrics().num_workers()
);
@@ -70,13 +70,13 @@ impl Supervisor {
let watchdog_task =
if systemd_mode && sd_notify::watchdog_enabled(true, &mut watchdog_micro_seconds) {
watchdog_duration = Some(Duration::from_micros(watchdog_micro_seconds));
log::debug!(
tracing::debug!(
"Systemd watchdog enabled with {} millisecond interval",
watchdog_micro_seconds.div_ceil(1000),
);
Some(spawn_watchdog_task(watchdog_duration.unwrap()))
} else {
log::debug!("Systemd watchdog not enabled, skipping watchdog thread");
tracing::debug!("Systemd watchdog not enabled, skipping watchdog thread");
None
};
@@ -133,7 +133,7 @@ impl Supervisor {
})
}
async fn stop_receiving_new_connections(&self) -> anyhow::Result<()> {
fn stop_receiving_new_connections(&self) -> anyhow::Result<()> {
self.handler_task_tracker.close();
self.supervisor_message_sender
.send(SupervisorMessage::StopAcceptingNewConnections)
@@ -141,7 +141,7 @@ impl Supervisor {
Ok(())
}
async fn resume_receiving_new_connections(&self) -> anyhow::Result<()> {
fn resume_receiving_new_connections(&self) -> anyhow::Result<()> {
self.handler_task_tracker.reopen();
self.supervisor_message_sender
.send(SupervisorMessage::ResumeAcceptingNewConnections)
@@ -194,30 +194,30 @@ impl Supervisor {
// NOTE: despite closing the existing db pool, any already acquired connections will remain valid until dropped,
// so we don't need to close existing connections here.
if self.config.lock().await.mysql != previous_config.mysql {
log::debug!("MySQL configuration has changed");
tracing::debug!("MySQL configuration has changed");
log::debug!("Restarting database connection pool with new configuration");
tracing::debug!("Restarting database connection pool with new configuration");
self.restart_db_connection_pool().await?;
}
if self.config.lock().await.socket_path != previous_config.socket_path {
log::debug!("Socket path configuration has changed, reloading listener");
tracing::debug!("Socket path configuration has changed, reloading listener");
if !listener_task_was_stopped {
listener_task_was_stopped = true;
log::debug!("Stop accepting new connections");
self.stop_receiving_new_connections().await?;
tracing::debug!("Stop accepting new connections");
self.stop_receiving_new_connections()?;
log::debug!("Waiting for existing connections to finish");
tracing::debug!("Waiting for existing connections to finish");
self.wait_for_existing_connections_to_finish().await?;
}
log::debug!("Reloading listener with new socket path");
tracing::debug!("Reloading listener with new socket path");
self.reload_listener().await?;
}
if listener_task_was_stopped {
log::debug!("Resuming listener task");
self.resume_receiving_new_connections().await?;
tracing::debug!("Resuming listener task");
self.resume_receiving_new_connections()?;
}
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
@@ -228,31 +228,28 @@ impl Supervisor {
pub async fn shutdown(&self) -> anyhow::Result<()> {
sd_notify::notify(false, &[sd_notify::NotifyState::Stopping])?;
log::debug!("Stop accepting new connections");
self.stop_receiving_new_connections().await?;
tracing::debug!("Stop accepting new connections");
self.stop_receiving_new_connections()?;
let connection_count = self.handler_task_tracker.len();
log::debug!(
tracing::debug!(
"Waiting for {} existing connections to finish",
connection_count
);
self.wait_for_existing_connections_to_finish().await?;
log::debug!("Shutting down listener task");
tracing::debug!("Shutting down listener task");
self.supervisor_message_sender
.send(SupervisorMessage::Shutdown)
.unwrap_or_else(|e| {
log::warn!(
"Failed to send shutdown message to listener task: {}",
e
);
tracing::warn!("Failed to send shutdown message to listener task: {}", e);
0
});
log::debug!("Shutting down database connection pool");
tracing::debug!("Shutting down database connection pool");
self.db_connection_pool.read().await.close().await;
log::debug!("Server shutdown complete");
tracing::debug!("Server shutdown complete");
std::process::exit(0);
}
@@ -266,19 +263,19 @@ impl Supervisor {
let mut rx = self.reload_message_receiver.resubscribe();
rx.recv().await
} => {
log::info!("Reloading configuration");
tracing::info!("Reloading configuration");
match self.reload().await {
Ok(()) => {
log::info!("Configuration reloaded successfully");
tracing::info!("Configuration reloaded successfully");
}
Err(e) => {
log::error!("Failed to reload configuration: {}", e);
tracing::error!("Failed to reload configuration: {}", e);
}
}
}
_ = self.shutdown_cancel_token.cancelled() => {
log::info!("Shutting down server");
tracing::info!("Shutting down server");
self.shutdown().await?;
break;
}
@@ -292,14 +289,14 @@ impl Supervisor {
fn spawn_watchdog_task(duration: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
let mut interval = interval(duration.div_f32(2.0));
log::debug!(
tracing::debug!(
"Starting systemd watchdog task, pinging every {} milliseconds",
duration.div_f32(2.0).as_millis()
);
loop {
interval.tick().await;
if let Err(err) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
log::warn!("Failed to notify systemd watchdog: {}", err);
tracing::warn!("Failed to notify systemd watchdog: {}", err);
}
}
})
@@ -323,7 +320,7 @@ fn spawn_status_notifier_task(task_tracker: TaskTracker) -> JoinHandle<()> {
if let Err(e) =
sd_notify::notify(false, &[sd_notify::NotifyState::Status(message.as_str())])
{
log::warn!("Failed to send systemd status notification: {}", e);
tracing::warn!("Failed to send systemd status notification: {}", e);
}
}
})
@@ -334,11 +331,11 @@ async fn create_unix_listener_with_socket_path(
) -> anyhow::Result<TokioUnixListener> {
let parent_directory = socket_path.parent().unwrap();
if !parent_directory.exists() {
log::debug!("Creating directory {:?}", parent_directory);
tracing::debug!("Creating directory {:?}", parent_directory);
fs::create_dir_all(parent_directory)?;
}
log::info!("Listening on socket {:?}", socket_path);
tracing::info!("Listening on socket {:?}", socket_path);
match fs::remove_file(socket_path.as_path()) {
Ok(_) => {}
@@ -359,7 +356,7 @@ async fn create_unix_listener_with_systemd_socket() -> anyhow::Result<TokioUnixL
debug_assert!(fd == 3, "Unexpected file descriptor from systemd: {}", fd);
log::debug!(
tracing::debug!(
"Received file descriptor from systemd with id: '{}', assuming socket",
fd
);
@@ -390,7 +387,7 @@ async fn create_db_connection_pool(config: &MysqlConfig) -> anyhow::Result<MySql
}?;
let pool_opts = pool.options();
log::debug!(
tracing::debug!(
"Successfully opened database connection pool with options (max_connections: {}, min_connections: {})",
pool_opts.get_max_connections(),
pool_opts.get_min_connections(),
@@ -414,11 +411,11 @@ fn spawn_signal_handler_task(
loop {
tokio::select! {
_ = sighup_stream.recv() => {
log::info!("Received SIGHUP signal");
tracing::info!("Received SIGHUP signal");
reload_sender.send(ReloadEvent).ok();
}
_ = sigterm_stream.recv() => {
log::info!("Received SIGTERM signal");
tracing::info!("Received SIGTERM signal");
shutdown_token.cancel();
break;
}
@@ -442,16 +439,16 @@ async fn listener_task(
Ok(message) = supervisor_message_receiver.recv() => {
match message {
SupervisorMessage::StopAcceptingNewConnections => {
log::info!("Listener task received stop accepting new connections message, stopping listener");
tracing::info!("Listener task received stop accepting new connections message, stopping listener");
while let Ok(msg) = supervisor_message_receiver.try_recv() {
if let SupervisorMessage::ResumeAcceptingNewConnections = msg {
log::info!("Listener task received resume accepting new connections message, resuming listener");
tracing::info!("Listener task received resume accepting new connections message, resuming listener");
break;
}
}
}
SupervisorMessage::Shutdown => {
log::info!("Listener task received shutdown message, exiting listener task");
tracing::info!("Listener task received shutdown message, exiting listener task");
break;
}
_ => {}
@@ -464,20 +461,20 @@ async fn listener_task(
} => {
match accept_result {
Ok((conn, _addr)) => {
log::debug!("Got new connection");
tracing::debug!("Got new connection");
let db_pool_clone = db_pool.clone();
task_tracker.spawn(async {
match session_handler(conn, db_pool_clone).await {
Ok(()) => {}
Err(e) => {
log::error!("Failed to run server: {}", e);
tracing::error!("Failed to run server: {}", e);
}
}
});
}
Err(e) => {
log::error!("Failed to accept new connection: {}", e);
tracing::error!("Failed to accept new connection: {}", e);
}
}
}