rest-endpoint: init

This commit is contained in:
2025-12-22 19:59:31 +09:00
parent 00793bed2b
commit e719840f72
8 changed files with 2939 additions and 7 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
result
result-*
target
config.toml

View File

@@ -1,12 +1,12 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ psycopg2-bin ])"
from argparse import ArgumentParser, Action
import os
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
import gzip
import os
from argparse import Action, ArgumentParser
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_values
@@ -85,7 +85,7 @@ def insert_sessions_into_db(
with conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS "minecraft_login_sessions"(
CREATE TABLE IF NOT EXISTS "minecraft_login_session"(
"username" TEXT NOT NULL,
"start" TIMESTAMP NOT NULL,
"duration" BIGINT NOT NULL CHECK (duration >= 0),
@@ -96,7 +96,7 @@ def insert_sessions_into_db(
execute_values(
cur,
"""
INSERT INTO minecraft_login_sessions(username, start, duration)
INSERT INTO minecraft_login_session(username, start, duration)
VALUES %s
ON CONFLICT DO NOTHING
""",

2421
rest-endpoint/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

19
rest-endpoint/Cargo.toml Normal file
View File

@@ -0,0 +1,19 @@
[package]
name = "minecraft-heatmap-rest-endpoint"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.100"
axum = "0.8.8"
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.53", features = ["derive"] }
indoc = "2.0.7"
itertools = "0.14.0"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
sqlx = { version = "0.8.6", features = ["bigdecimal", "chrono", "postgres", "runtime-tokio"] }
tokio = { version = "1.48.0", features = ["rt-multi-thread"] }
toml = "0.9.10"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }

114
rest-endpoint/src/config.rs Normal file
View File

@@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Config {
pub database: DatabaseConfig,
pub web_server: WebServerConfig,
}
fn default_db_host() -> String {
"localhost".to_string()
}
fn default_db_port() -> u16 {
5432
}
fn default_db_username() -> String {
"minecraft_heatmap".to_string()
}
fn default_db_name() -> String {
"minecraft_heatmap".to_string()
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DatabaseConfig {
/// The database host address.
#[serde(default = "default_db_host")]
pub host: String,
/// The database port number.
#[serde(default = "default_db_port")]
pub port: u16,
/// The username for database authentication.
#[serde(default = "default_db_username")]
pub username: String,
/// The password for database authentication.
pub password: Option<String>,
/// The path to a file containing the database password.
pub password_file: Option<String>,
/// The name of the database to connect to.
#[serde(default = "default_db_name")]
pub database_name: String,
}
impl DatabaseConfig {
/// Constructs the database connection URL.
pub fn to_database_url(&self) -> String {
let password_part = if let Some(ref password) = self.password {
format!(":{}", password)
} else if let Some(ref password_file) = self.password_file {
let pwd = std::fs::read_to_string(password_file)
.unwrap_or_default()
.trim()
.to_string();
format!(":{}", pwd)
} else {
"".to_string()
};
format!(
"postgresql://{}{}@{}:{}/{}",
self.username, password_part, self.host, self.port, self.database_name,
)
}
}
fn default_webserver_host() -> String {
"localhost".to_string()
}
fn default_webserver_port() -> u16 {
8080
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct WebServerConfig {
/// The web server host address.
#[serde(default = "default_webserver_host")]
pub host: String,
/// The web server port number.
#[serde(default = "default_webserver_port")]
pub port: u16,
}
impl WebServerConfig {
/// Tries to resolve the [`host`](Self::host) to an IP address.
pub fn resolve_hostname(&self) -> anyhow::Result<IpAddr> {
let addr = format!("{}:{}", self.host, self.port);
let socket_addrs: Vec<SocketAddr> = addr.to_socket_addrs()?.collect();
if let Some(first_addr) = socket_addrs.first() {
Ok(first_addr.ip())
} else {
Err(anyhow::anyhow!("Could not resolve hostname: {}", self.host))
}
}
/// Resolves the host and port to a [`SocketAddr`](std::net::SocketAddr).
pub fn get_socket_addr(&self) -> anyhow::Result<SocketAddr> {
let addr = format!("{}:{}", self.host, self.port);
let socket_addrs: Vec<SocketAddr> = addr.to_socket_addrs()?.collect();
if let Some(first_addr) = socket_addrs.first() {
Ok(*first_addr)
} else {
Err(anyhow::anyhow!("Could not resolve hostname: {}", self.host))
}
}
}

155
rest-endpoint/src/main.rs Normal file
View File

@@ -0,0 +1,155 @@
pub mod config;
pub mod queries;
pub mod web_api;
use crate::config::Config;
use anyhow::Context;
use axum::Router;
use clap::Parser;
use itertools::Itertools;
use sqlx::{Executor, postgres::PgPoolOptions};
#[derive(clap::Parser, Debug, Clone)]
struct Args {
/// Path to the configuration file.
#[arg(short, long, default_value = "./config.toml")]
config_path: String,
#[command(subcommand)]
command: Commands,
}
#[derive(clap::Subcommand, Debug, Clone)]
pub enum Commands {
/// Serve the REST API
Serve,
/// Connect to the database, create tables if they do not exist, and seed with test data
SeedTestData,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::fmt()
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.with_thread_ids(true)
.with_thread_names(true)
.init();
let args = Args::parse();
tracing::info!("Reading configuration at './config.toml'");
let config_content = std::fs::read_to_string(&args.config_path).context(format!(
"Failed to read configuration file at '{}'",
&args.config_path
))?;
let config: Config =
toml::from_str(&config_content).context("Failed to parse configuration")?;
tracing::info!(
"Starting database connection pool to '{}@{}:{}/{}'",
config.database.username,
config.database.host,
config.database.port,
config.database.database_name,
);
let db_pool = PgPoolOptions::new()
.max_connections(5)
.after_connect({
move |conn, _meta| {
Box::pin(async move {
tracing::debug!("Configuring database connection...");
conn.execute("SET TIME ZONE 'UTC';").await?;
conn.execute("SET client_encoding = 'UTF8';").await?;
conn.execute("SET application_name = 'minecraft-heatmap-rest-endpoint';")
.await?;
tracing::debug!("Database connection configured.");
Ok(())
})
}
})
.connect(&config.database.to_database_url())
.await
.context("Failed to connect to the database")?;
tracing::info!("Database connection pool established.");
match args.command {
Commands::Serve => serve_api(config, db_pool).await,
Commands::SeedTestData => seed_test_data(config, db_pool).await,
}
}
async fn serve_api(config: Config, db_pool: sqlx::PgPool) -> anyhow::Result<()> {
tracing::info!(
"Starting web server at {}:{}",
config.web_server.host,
config.web_server.port
);
let app = Router::new().nest("/api/v1", web_api::create_router(db_pool).await);
let socket_address = config
.web_server
.get_socket_addr()
.context("Could not get socket address for web server")?;
let listener = tokio::net::TcpListener::bind(socket_address)
.await
.context("Failed to bind to the specified address")?;
axum::serve(listener, app.into_make_service())
.await
.context("Failed to start the web server")?;
Ok(())
}
async fn seed_test_data(_config: Config, db_pool: sqlx::PgPool) -> anyhow::Result<()> {
tracing::info!("Creating tables if not exists...");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS "minecraft_heatmap"."minecraft_login_session"(
"username" TEXT NOT NULL,
"start" TIMESTAMP NOT NULL,
"duration" BIGINT NOT NULL CHECK (duration >= 0),
PRIMARY KEY ("username", "start")
)
"#,
)
.execute(&db_pool)
.await
.expect("Failed to create tables");
tracing::info!("Inserting test data...");
let test_data = vec![
("player1", "2023-10-01 10:00:00", 3600),
("player2", "2023-10-01 11:00:00", 1800),
("player1", "2023-10-02 09:30:00", 7200),
("player3", "2023-10-02 12:15:00", 4500),
];
let (players, starts, durations): (Vec<&str>, Vec<&str>, Vec<i32>) =
test_data.iter().cloned().multiunzip();
sqlx::query(
r#"
INSERT INTO "minecraft_heatmap"."minecraft_login_session" (username, start, duration)
SELECT UNNEST($1::TEXT[]), UNNEST($2::TIMESTAMP[]), UNNEST($3::BIGINT[]);
"#,
)
.bind(&players)
.bind(&starts)
.bind(&durations)
.execute(&db_pool)
.await
.expect("Failed to insert test data");
Ok(())
}

View File

@@ -0,0 +1,119 @@
use std::collections::HashMap;
use indoc::indoc;
use serde::Deserialize;
use serde::Serialize;
use sqlx::FromRow;
use sqlx::PgPool;
// use serde::{Deserialize, Serialize};
use sqlx::QueryBuilder;
/// Represents a single user login session.
#[derive(FromRow, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct LoginSession {
/// The username of the user.
pub username: String,
/// The start time of the login session.
pub start: chrono::NaiveDateTime,
/// The duration of the login session in seconds.
pub duration: i64,
}
async fn filter_time(
query_builder: &mut QueryBuilder<'_, sqlx::Postgres>,
start_time: Option<chrono::NaiveDateTime>,
end_time: Option<chrono::NaiveDateTime>,
) {
if let Some(start) = start_time {
query_builder.push(" AND start >= ");
query_builder.push_bind(start);
}
if let Some(end) = end_time {
query_builder.push(" AND start + make_interval(secs => duration) <= ");
query_builder.push_bind(end);
}
}
/// Retrieves login sessions for a specific user within an optional time range.
pub async fn get_login_sessions(
pool: &PgPool,
start_time: Option<chrono::NaiveDateTime>,
end_time: Option<chrono::NaiveDateTime>,
) -> Result<Vec<LoginSession>, sqlx::Error> {
let mut query_builder = QueryBuilder::new(indoc! {r#"
SELECT
username,
start,
duration
FROM
minecraft_heatmap.minecraft_login_session
WHERE
1=1
"#});
filter_time(&mut query_builder, start_time, end_time).await;
query_builder.push(" ORDER BY start ASC");
let query = query_builder.build_query_as::<LoginSession>();
let sessions = query.fetch_all(pool).await?;
Ok(sessions)
}
// Total overall time spent by any user logged in
pub async fn get_total_login_time(
pool: &PgPool,
start_time: Option<chrono::NaiveDateTime>,
end_time: Option<chrono::NaiveDateTime>,
) -> Result<u64, sqlx::Error> {
let mut query_builder = QueryBuilder::new(indoc! {r#"
SELECT
CAST(SUM(duration) AS BIGINT) as total_duration
FROM
minecraft_heatmap.minecraft_login_session
WHERE
1=1
"#});
filter_time(&mut query_builder, start_time, end_time).await;
let query = query_builder.build_query_as::<(Option<i64>,)>();
let result = query.fetch_one(pool).await?;
Ok(result.0.unwrap_or(0) as u64)
}
/// Retrieves total login time per user within an optional time range.
pub async fn get_user_login_times(
pool: &PgPool,
start_time: Option<chrono::NaiveDateTime>,
end_time: Option<chrono::NaiveDateTime>,
) -> Result<HashMap<String, u64>, sqlx::Error> {
let mut query_builder = QueryBuilder::new(indoc! {r#"
SELECT
username,
CAST(SUM(duration) AS BIGINT) as total_duration
FROM
minecraft_heatmap.minecraft_login_session
WHERE
1=1
"#});
filter_time(&mut query_builder, start_time, end_time).await;
query_builder.push(" GROUP BY username ORDER BY total_duration DESC");
let query = query_builder.build_query_as::<(String, Option<i64>)>();
let results = query.fetch_all(pool).await?;
let user_times = results
.into_iter()
.map(|(username, total_duration)| (username, total_duration.unwrap_or(0) as u64))
.collect();
Ok(user_times)
}

View File

@@ -0,0 +1,101 @@
// Create axum api
use axum::{
Json, Router,
extract::{Query, State},
response::IntoResponse,
routing::get,
};
use serde_json::json;
use crate::queries::{get_login_sessions, get_total_login_time, get_user_login_times};
pub async fn create_router(db_pool: sqlx::PgPool) -> Router {
let app = Router::new()
.route("/ping", get(handle_ping))
.route("/login_sessions", get(handle_login_sessions))
.route("/total_login_time", get(handle_total_login_time))
.route("/user_login_time", get(handle_user_login_time))
.with_state(db_pool);
app
}
async fn handle_ping() -> impl IntoResponse {
"Pong!"
}
#[derive(Debug, Clone, serde::Deserialize)]
struct StartEndTime {
start_time: Option<chrono::NaiveDateTime>,
end_time: Option<chrono::NaiveDateTime>,
}
/// A list of all login sessions.
/// Optionally limited to a time range.
async fn handle_login_sessions(
Query(time_filter): Query<StartEndTime>,
State(db_pool): State<sqlx::PgPool>,
) -> impl IntoResponse {
match get_login_sessions(&db_pool, time_filter.start_time, time_filter.end_time).await {
Ok(rows) => {
let response = json!({ "login_sessions": rows });
Json(response).into_response()
}
Err(e) => {
let response =
json!({ "error": format!("Failed to retrieve login sessions: {}", e) });
(
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(response),
)
.into_response()
}
}
}
/// A single number summing up all login time by all users.
/// Optionally limited to a time range.
async fn handle_total_login_time(
Query(time_filter): Query<StartEndTime>,
State(db_pool): State<sqlx::PgPool>,
) -> impl IntoResponse {
match get_total_login_time(&db_pool, time_filter.start_time, time_filter.end_time).await {
Ok(total_time) => {
let response = json!({ "total_login_time_seconds": total_time });
Json(response).into_response()
}
Err(e) => {
let response =
json!({ "error": format!("Failed to retrieve total login time: {}", e) });
(
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(response),
)
.into_response()
}
}
}
/// A list of all users logged in and their total login time.
/// Optionally limited to a time range.
async fn handle_user_login_time(
Query(time_filter): Query<StartEndTime>,
State(db_pool): State<sqlx::PgPool>,
) -> impl IntoResponse {
match get_user_login_times(&db_pool, time_filter.start_time, time_filter.end_time).await {
Ok(user_times) => {
let response = json!({ "user_login_times": user_times });
Json(response).into_response()
}
Err(e) => {
let response =
json!({ "error": format!("Failed to retrieve user login times: {}", e) });
(
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(response),
)
.into_response()
}
}
}