Add panel feature updates across API, daemon, and web

This commit is contained in:
2026-03-02 21:53:54 +00:00
parent 6b463c2b1a
commit afc64b83c1
49 changed files with 7040 additions and 305 deletions
+1
View File
@@ -14,6 +14,7 @@ FROM debian:bookworm-slim AS production
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
libssl3 \
mariadb-client \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
+152
View File
@@ -0,0 +1,152 @@
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use anyhow::{anyhow, Result};
use tokio::sync::{mpsc, oneshot, RwLock};
use tracing::{debug, warn};
use crate::server::ServerManager;
const DEFAULT_QUEUE_CAPACITY: usize = 256;
#[derive(Debug)]
struct CommandJob {
command: String,
response_tx: oneshot::Sender<Result<()>>,
}
#[derive(Clone)]
struct WorkerHandle {
id: u64,
sender: mpsc::Sender<CommandJob>,
}
pub struct CommandDispatcher {
server_manager: Arc<ServerManager>,
workers: Arc<RwLock<HashMap<String, WorkerHandle>>>,
next_worker_id: Arc<AtomicU64>,
queue_capacity: usize,
}
impl CommandDispatcher {
pub fn new(server_manager: Arc<ServerManager>) -> Self {
Self {
server_manager,
workers: Arc::new(RwLock::new(HashMap::new())),
next_worker_id: Arc::new(AtomicU64::new(1)),
queue_capacity: DEFAULT_QUEUE_CAPACITY,
}
}
pub async fn send_command(&self, server_uuid: &str, command: &str) -> Result<()> {
let cmd = command.trim();
if cmd.is_empty() {
return Err(anyhow!("Command cannot be empty"));
}
// Retry once if the current worker channel is unexpectedly closed.
for _ in 0..2 {
let worker = self.get_or_create_worker(server_uuid).await;
let (response_tx, response_rx) = oneshot::channel();
let job = CommandJob {
command: cmd.to_string(),
response_tx,
};
match worker.sender.send(job).await {
Ok(_) => {
return response_rx
.await
.unwrap_or_else(|_| Err(anyhow!("Command worker dropped response channel")));
}
Err(send_err) => {
warn!(
server_uuid = %server_uuid,
worker_id = worker.id,
error = %send_err,
"Command worker queue send failed, rotating worker",
);
self.remove_worker_if_matches(server_uuid, worker.id).await;
}
}
}
Err(anyhow!("Failed to dispatch command after retry"))
}
async fn get_or_create_worker(&self, server_uuid: &str) -> WorkerHandle {
if let Some(existing) = self.workers.read().await.get(server_uuid).cloned() {
return existing;
}
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = mpsc::channel::<CommandJob>(self.queue_capacity);
let handle = WorkerHandle {
id: worker_id,
sender: sender.clone(),
};
{
let mut workers = self.workers.write().await;
if let Some(existing) = workers.get(server_uuid).cloned() {
return existing;
}
workers.insert(server_uuid.to_string(), handle.clone());
}
self.spawn_worker(server_uuid.to_string(), worker_id, receiver);
handle
}
fn spawn_worker(
&self,
server_uuid: String,
worker_id: u64,
mut receiver: mpsc::Receiver<CommandJob>,
) {
let server_manager = self.server_manager.clone();
let workers = self.workers.clone();
tokio::spawn(async move {
debug!(server_uuid = %server_uuid, worker_id, "Command worker started");
while let Some(job) = receiver.recv().await {
let result = execute_command(server_manager.clone(), &server_uuid, &job.command).await;
let _ = job.response_tx.send(result);
}
let mut map = workers.write().await;
if let Some(current) = map.get(&server_uuid) {
if current.id == worker_id {
map.remove(&server_uuid);
}
}
debug!(server_uuid = %server_uuid, worker_id, "Command worker stopped");
});
}
async fn remove_worker_if_matches(&self, server_uuid: &str, worker_id: u64) {
let mut workers = self.workers.write().await;
if let Some(current) = workers.get(server_uuid) {
if current.id == worker_id {
workers.remove(server_uuid);
}
}
}
}
async fn execute_command(
server_manager: Arc<ServerManager>,
server_uuid: &str,
command: &str,
) -> Result<()> {
server_manager
.docker()
.send_command(server_uuid, command)
.await?;
Ok(())
}
+15
View File
@@ -14,6 +14,8 @@ pub struct DaemonConfig {
pub data_path: PathBuf,
#[serde(default = "default_backup_path")]
pub backup_path: PathBuf,
#[serde(default)]
pub managed_mysql: Option<ManagedMysqlConfig>,
}
#[derive(Debug, Deserialize)]
@@ -36,6 +38,19 @@ impl Default for DockerConfig {
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct ManagedMysqlConfig {
pub url: String,
#[serde(default)]
pub connection_host: Option<String>,
#[serde(default)]
pub connection_port: Option<u16>,
#[serde(default)]
pub phpmyadmin_url: Option<String>,
#[serde(default)]
pub bin: Option<String>,
}
fn default_grpc_port() -> u16 {
50051
}
+77 -17
View File
@@ -2,14 +2,14 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use bollard::container::{
Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions, StartContainerOptions,
AttachContainerOptions, Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions, StartContainerOptions,
StopContainerOptions, StatsOptions, Stats,
};
use bollard::image::CreateImageOptions;
use bollard::models::{HostConfig, PortBinding};
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tracing::info;
use tracing::{debug, info};
use crate::docker::DockerManager;
use crate::server::ServerSpec;
@@ -33,6 +33,65 @@ fn container_data_path_for_image(image: &str) -> &'static str {
}
impl DockerManager {
async fn attach_command_stream(
&self,
container_name: &str,
) -> Result<Arc<crate::docker::manager::CommandStreamHandle>> {
let bollard::container::AttachContainerResults { mut output, input } = self
.client()
.attach_container(
container_name,
Some(AttachContainerOptions::<String> {
stdin: Some(true),
stream: Some(true),
..Default::default()
}),
)
.await?;
let name = container_name.to_string();
let drain_task = tokio::spawn(async move {
while let Some(chunk) = output.next().await {
if let Err(error) = chunk {
debug!(container = %name, error = %error, "Container stdin attach stream closed");
break;
}
}
debug!(container = %name, "Container stdin attach stream ended");
});
Ok(Arc::new(crate::docker::manager::CommandStreamHandle::new(input, drain_task)))
}
async fn get_or_attach_command_stream(
&self,
server_uuid: &str,
) -> Result<Arc<crate::docker::manager::CommandStreamHandle>> {
let name = container_name(server_uuid);
if let Some(existing) = self.command_streams().read().await.get(&name).cloned() {
return Ok(existing);
}
let created = self.attach_command_stream(&name).await?;
let mut streams = self.command_streams().write().await;
if let Some(existing) = streams.get(&name).cloned() {
created.abort();
return Ok(existing);
}
streams.insert(name, created.clone());
Ok(created)
}
async fn clear_command_stream(&self, server_uuid: &str) {
let name = container_name(server_uuid);
if let Some(stream) = self.command_streams().write().await.remove(&name) {
stream.abort();
}
}
async fn run_exec(&self, container_name: &str, cmd: Vec<String>) -> Result<String> {
let exec = self
.client()
@@ -206,6 +265,7 @@ impl DockerManager {
/// Stop a container gracefully.
pub async fn stop_container(&self, server_uuid: &str, timeout_secs: i64) -> Result<()> {
let name = container_name(server_uuid);
self.clear_command_stream(server_uuid).await;
self.client()
.stop_container(
&name,
@@ -221,6 +281,7 @@ impl DockerManager {
/// Kill a container immediately.
pub async fn kill_container(&self, server_uuid: &str) -> Result<()> {
let name = container_name(server_uuid);
self.clear_command_stream(server_uuid).await;
self.client()
.kill_container::<String>(&name, None)
.await?;
@@ -231,6 +292,7 @@ impl DockerManager {
/// Remove a container and its volumes.
pub async fn remove_container(&self, server_uuid: &str) -> Result<()> {
let name = container_name(server_uuid);
self.clear_command_stream(server_uuid).await;
self.client()
.remove_container(
&name,
@@ -338,24 +400,22 @@ impl DockerManager {
})
}
/// Send a command to a container via exec (attach to stdin).
/// Send a command to a container via a persistent Docker attach stdin stream.
pub async fn send_command(&self, server_uuid: &str, command: &str) -> Result<()> {
let name = container_name(server_uuid);
let trimmed = command.trim_end_matches(|ch| ch == '\r' || ch == '\n');
let payload = format!("{trimmed}\n");
// Preferred path for Minecraft-like images where rcon-cli is available.
if self
.run_exec(&name, vec!["rcon-cli".to_string(), command.to_string()])
.await
.is_ok()
{
return Ok(());
for _ in 0..2 {
let stream = self.get_or_attach_command_stream(server_uuid).await?;
match stream.write_all(payload.as_bytes()).await {
Ok(_) => return Ok(()),
Err(error) => {
debug!(server_uuid = %server_uuid, error = %error, "Failed to write to container stdin, resetting attach stream");
self.clear_command_stream(server_uuid).await;
}
}
}
// Generic fallback: write directly to PID 1 stdin.
let escaped = command.replace('\'', "'\"'\"'");
let shell_cmd = format!("printf '%s\\n' '{}' > /proc/1/fd/0", escaped);
self.run_exec(&name, vec!["sh".to_string(), "-c".to_string(), shell_cmd])
.await
.map(|_| ())
Err(anyhow::anyhow!("failed to write command to container stdin"))
}
}
+40
View File
@@ -1,15 +1,50 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use anyhow::Result;
use bollard::Docker;
use bollard::network::CreateNetworkOptions;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tracing::info;
use crate::config::DockerConfig;
type AttachedInput = Pin<Box<dyn AsyncWrite + Send>>;
pub(crate) struct CommandStreamHandle {
input: Mutex<AttachedInput>,
drain_task: JoinHandle<()>,
}
impl CommandStreamHandle {
pub(crate) fn new(input: AttachedInput, drain_task: JoinHandle<()>) -> Self {
Self {
input: Mutex::new(input),
drain_task,
}
}
pub(crate) async fn write_all(&self, bytes: &[u8]) -> Result<()> {
let mut input = self.input.lock().await;
input.write_all(bytes).await?;
input.flush().await?;
Ok(())
}
pub(crate) fn abort(&self) {
self.drain_task.abort();
}
}
/// Manages the Docker client and network setup.
#[derive(Clone)]
pub struct DockerManager {
client: Docker,
network_name: String,
command_streams: Arc<RwLock<HashMap<String, Arc<CommandStreamHandle>>>>,
}
impl DockerManager {
@@ -30,6 +65,7 @@ impl DockerManager {
let manager = Self {
client,
network_name: config.network.clone(),
command_streams: Arc::new(RwLock::new(HashMap::new())),
};
manager.ensure_network(&config.network_subnet).await?;
@@ -45,6 +81,10 @@ impl DockerManager {
&self.network_name
}
pub(crate) fn command_streams(&self) -> &Arc<RwLock<HashMap<String, Arc<CommandStreamHandle>>>> {
&self.command_streams
}
async fn ensure_network(&self, subnet: &str) -> Result<()> {
let networks = self.client.list_networks::<String>(None).await?;
let exists = networks
+131 -40
View File
@@ -13,9 +13,11 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::{info, error, warn};
use crate::command::CommandDispatcher;
use crate::server::{ServerManager, PortMap};
use crate::filesystem::FileSystem;
use crate::backup::BackupManager;
use crate::managed_mysql::ManagedMysqlManager;
// Import generated protobuf types
pub mod pb {
@@ -27,7 +29,9 @@ use pb::*;
pub struct DaemonServiceImpl {
server_manager: Arc<ServerManager>,
command_dispatcher: Arc<CommandDispatcher>,
backup_manager: BackupManager,
managed_mysql: Arc<ManagedMysqlManager>,
daemon_token: String,
start_time: Instant,
}
@@ -35,9 +39,11 @@ pub struct DaemonServiceImpl {
impl DaemonServiceImpl {
pub fn new(
server_manager: Arc<ServerManager>,
command_dispatcher: Arc<CommandDispatcher>,
daemon_token: String,
backup_root: PathBuf,
api_url: String,
managed_mysql: Arc<ManagedMysqlManager>,
) -> Self {
let backup_manager = BackupManager::new(
server_manager.clone(),
@@ -48,7 +54,9 @@ impl DaemonServiceImpl {
Self {
server_manager,
command_dispatcher,
backup_manager,
managed_mysql,
daemon_token,
start_time: Instant::now(),
}
@@ -106,6 +114,21 @@ impl DaemonServiceImpl {
Self::env_value(env, &["CS2_RCONPW", "CS2_RCON_PASSWORD", "SRCDS_RCONPW", "RCON_PASSWORD"])
.unwrap_or_else(|| "changeme".to_string())
}
fn map_ports(ports: &[PortMapping]) -> Vec<PortMap> {
ports
.iter()
.map(|p| PortMap {
host_port: p.host_port as u16,
container_port: p.container_port as u16,
protocol: if p.protocol.is_empty() {
"tcp".to_string()
} else {
p.protocol.clone()
},
})
.collect()
}
}
type GrpcStream<T> = Pin<Box<dyn futures::Stream<Item = Result<T, Status>> + Send>>;
@@ -168,20 +191,6 @@ impl DaemonService for DaemonServiceImpl {
self.check_auth(&request)?;
let req = request.into_inner();
let ports: Vec<PortMap> = req
.ports
.iter()
.map(|p| PortMap {
host_port: p.host_port as u16,
container_port: p.container_port as u16,
protocol: if p.protocol.is_empty() {
"tcp".to_string()
} else {
p.protocol.clone()
},
})
.collect();
self.server_manager
.create_server(
req.uuid.clone(),
@@ -191,7 +200,7 @@ impl DaemonService for DaemonServiceImpl {
req.cpu_limit,
req.startup_command,
req.environment,
ports,
Self::map_ports(&req.ports),
)
.await
.map_err(|e| Status::from(e))?;
@@ -202,6 +211,33 @@ impl DaemonService for DaemonServiceImpl {
}))
}
async fn update_server(
&self,
request: Request<UpdateServerRequest>,
) -> Result<Response<ServerResponse>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let state = self.server_manager
.update_server(
req.uuid.clone(),
req.docker_image,
req.memory_limit,
req.disk_limit,
req.cpu_limit,
req.startup_command,
req.environment,
Self::map_ports(&req.ports),
)
.await
.map_err(Status::from)?;
Ok(Response::new(ServerResponse {
uuid: req.uuid,
status: state.to_string(),
}))
}
async fn delete_server(
&self,
request: Request<ServerIdentifier>,
@@ -232,6 +268,85 @@ impl DaemonService for DaemonServiceImpl {
Ok(Response::new(Empty {}))
}
async fn create_database(
&self,
request: Request<CreateDatabaseRequest>,
) -> Result<Response<ManagedDatabaseCredentials>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
if req.server_uuid.trim().is_empty() {
return Err(Status::invalid_argument("Server UUID is required"));
}
if req.name.trim().is_empty() {
return Err(Status::invalid_argument("Database name is required"));
}
let password = req.password.trim();
let database = self
.managed_mysql
.create_database(
req.server_uuid.trim(),
req.name.trim(),
if password.is_empty() { None } else { Some(password) },
)
.await
.map_err(Status::from)?;
Ok(Response::new(ManagedDatabaseCredentials {
database_name: database.database_name,
username: database.username,
password: database.password,
host: database.host,
port: i32::from(database.port),
phpmyadmin_url: database.phpmyadmin_url.unwrap_or_default(),
}))
}
async fn update_database_password(
&self,
request: Request<UpdateDatabasePasswordRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
if req.username.trim().is_empty() {
return Err(Status::invalid_argument("Database username is required"));
}
if req.password.trim().is_empty() {
return Err(Status::invalid_argument("Database password is required"));
}
self.managed_mysql
.update_password(req.username.trim(), req.password.trim())
.await
.map_err(Status::from)?;
Ok(Response::new(Empty {}))
}
async fn delete_database(
&self,
request: Request<DeleteDatabaseRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
if req.database_name.trim().is_empty() {
return Err(Status::invalid_argument("Database name is required"));
}
if req.username.trim().is_empty() {
return Err(Status::invalid_argument("Database username is required"));
}
self.managed_mysql
.delete_database(req.database_name.trim(), req.username.trim())
.await
.map_err(Status::from)?;
Ok(Response::new(Empty {}))
}
// === Power ===
async fn set_power_state(
@@ -331,31 +446,7 @@ impl DaemonService for DaemonServiceImpl {
self.check_auth(&request)?;
let req = request.into_inner();
if let Some((image, env)) = self.get_server_runtime(&req.uuid).await {
let image = image.to_lowercase();
if image.contains("cs2") || image.contains("csgo") {
let host = Self::env_value(&env, &["RCON_HOST"])
.unwrap_or_else(|| "127.0.0.1".to_string());
let port = Self::env_u16(&env, &["RCON_PORT", "CS2_PORT"]).unwrap_or(27015);
let password = Self::cs2_rcon_password(&env);
let address = format!("{}:{}", host, port);
match crate::game::rcon::RconClient::connect(&address, &password).await {
Ok(mut client) => match client.command(&req.command).await {
Ok(_) => return Ok(Response::new(Empty {})),
Err(e) => {
warn!(uuid = %req.uuid, command = %req.command, error = %e, "CS2 RCON command failed");
}
},
Err(e) => {
warn!(uuid = %req.uuid, command = %req.command, error = %e, "CS2 RCON connect failed");
}
}
}
}
self.server_manager
.docker()
self.command_dispatcher
.send_command(&req.uuid, &req.command)
.await
.map_err(|e| Status::internal(e.to_string()))?;
+14
View File
@@ -6,19 +6,23 @@ use tracing_subscriber::EnvFilter;
mod auth;
mod backup;
mod command;
mod config;
mod docker;
mod error;
mod filesystem;
mod game;
mod grpc;
mod managed_mysql;
mod scheduler;
mod server;
use crate::docker::DockerManager;
use crate::grpc::DaemonServiceImpl;
use crate::grpc::service::pb::daemon_service_server::DaemonServiceServer;
use crate::managed_mysql::ManagedMysqlManager;
use crate::server::ServerManager;
use crate::command::CommandDispatcher;
const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 32 * 1024 * 1024;
@@ -45,12 +49,21 @@ async fn main() -> Result<()> {
let server_manager = Arc::new(ServerManager::new(docker, &config));
info!("Server manager initialized");
// Initialize shared command dispatcher (single command pipeline for all games/sources)
let command_dispatcher = Arc::new(CommandDispatcher::new(server_manager.clone()));
info!("Command dispatcher initialized");
let managed_mysql = Arc::new(ManagedMysqlManager::new(config.managed_mysql.clone())?);
info!(enabled = managed_mysql.is_enabled(), "Managed MySQL initialized");
// Create gRPC service
let daemon_service = DaemonServiceImpl::new(
server_manager.clone(),
command_dispatcher.clone(),
config.node_token.clone(),
config.backup_path.clone(),
config.api_url.clone(),
managed_mysql.clone(),
);
// Start gRPC server
@@ -68,6 +81,7 @@ async fn main() -> Result<()> {
// Scheduler task
let sched = Arc::new(scheduler::Scheduler::new(
server_manager.clone(),
command_dispatcher.clone(),
config.api_url.clone(),
config.node_token.clone(),
));
+369
View File
@@ -0,0 +1,369 @@
use std::io::ErrorKind;
use reqwest::Url;
use thiserror::Error;
use tokio::process::Command;
use tonic::Status;
use uuid::Uuid;
use crate::config::ManagedMysqlConfig;
#[derive(Debug, Clone)]
struct ManagedMysqlRuntimeConfig {
admin_database: String,
admin_host: String,
admin_password: String,
admin_port: u16,
admin_username: String,
client_bin: Option<String>,
connection_host: String,
connection_port: u16,
phpmyadmin_url: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ManagedMysqlDatabase {
pub database_name: String,
pub username: String,
pub password: String,
pub host: String,
pub port: u16,
pub phpmyadmin_url: Option<String>,
}
#[derive(Debug, Error)]
pub enum ManagedMysqlError {
#[error("Managed MySQL is not configured on this node")]
NotConfigured,
#[error("Managed MySQL configuration is invalid: {0}")]
InvalidConfig(String),
#[error("Managed MySQL client binary is not installed on this node")]
ClientMissing,
#[error("Managed MySQL command failed: {0}")]
CommandFailed(String),
#[error("Managed MySQL I/O error: {0}")]
Io(#[from] std::io::Error),
}
impl From<ManagedMysqlError> for Status {
fn from(error: ManagedMysqlError) -> Self {
match error {
ManagedMysqlError::NotConfigured | ManagedMysqlError::ClientMissing => {
Status::failed_precondition(error.to_string())
}
ManagedMysqlError::InvalidConfig(_) => Status::internal(error.to_string()),
ManagedMysqlError::CommandFailed(_) => Status::internal(error.to_string()),
ManagedMysqlError::Io(_) => Status::internal(error.to_string()),
}
}
}
#[derive(Debug, Clone)]
pub struct ManagedMysqlManager {
config: Option<ManagedMysqlRuntimeConfig>,
}
impl ManagedMysqlManager {
pub fn new(config: Option<ManagedMysqlConfig>) -> Result<Self, ManagedMysqlError> {
let runtime = match config {
Some(config) => Some(resolve_runtime_config(config)?),
None => None,
};
Ok(Self { config: runtime })
}
pub fn is_enabled(&self) -> bool {
self.config.is_some()
}
pub async fn create_database(
&self,
server_uuid: &str,
label: &str,
password: Option<&str>,
) -> Result<ManagedMysqlDatabase, ManagedMysqlError> {
let config = self.config.as_ref().ok_or(ManagedMysqlError::NotConfigured)?;
let label = label.trim();
if label.is_empty() {
return Err(ManagedMysqlError::CommandFailed(
"Database name is required".to_string(),
));
}
let database_name = build_database_name(server_uuid, label);
let username = build_username(server_uuid);
let password = build_password(password);
self.run_sql(
config,
&format!(
"CREATE DATABASE {} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci",
escape_identifier(&database_name)
),
)
.await?;
if let Err(error) = self
.run_sql(
config,
&format!(
"CREATE USER {}@'%' IDENTIFIED BY {};GRANT ALL PRIVILEGES ON {}.* TO {}@'%'",
escape_string(&username),
escape_string(&password),
escape_identifier(&database_name),
escape_string(&username),
),
)
.await
{
let _ = self
.run_sql(
config,
&format!("DROP DATABASE IF EXISTS {}", escape_identifier(&database_name)),
)
.await;
return Err(error);
}
Ok(ManagedMysqlDatabase {
database_name: database_name.clone(),
username,
password,
host: config.connection_host.clone(),
port: config.connection_port,
phpmyadmin_url: build_phpmyadmin_url(config.phpmyadmin_url.as_deref(), &database_name),
})
}
pub async fn update_password(
&self,
username: &str,
password: &str,
) -> Result<(), ManagedMysqlError> {
let config = self.config.as_ref().ok_or(ManagedMysqlError::NotConfigured)?;
let password = password.trim();
if password.is_empty() {
return Err(ManagedMysqlError::CommandFailed(
"Database password is required".to_string(),
));
}
self.run_sql(
config,
&format!(
"ALTER USER {}@'%' IDENTIFIED BY {}",
escape_string(username),
escape_string(password),
),
)
.await
}
pub async fn delete_database(
&self,
database_name: &str,
username: &str,
) -> Result<(), ManagedMysqlError> {
let config = self.config.as_ref().ok_or(ManagedMysqlError::NotConfigured)?;
self.run_sql(
config,
&format!(
"DROP DATABASE IF EXISTS {};DROP USER IF EXISTS {}@'%'",
escape_identifier(database_name),
escape_string(username),
),
)
.await
}
async fn run_sql(
&self,
config: &ManagedMysqlRuntimeConfig,
sql: &str,
) -> Result<(), ManagedMysqlError> {
let binaries = match config.client_bin.as_deref() {
Some(bin) if !bin.trim().is_empty() => vec![bin.to_string()],
_ => vec!["mariadb".to_string(), "mysql".to_string()],
};
let mut missing_binary = false;
for binary in binaries {
let output = Command::new(&binary)
.args([
"--protocol=TCP",
"--batch",
"--skip-column-names",
"-h",
&config.admin_host,
"-P",
&config.admin_port.to_string(),
"-u",
&config.admin_username,
&config.admin_database,
"-e",
sql,
])
.env("MYSQL_PWD", &config.admin_password)
.output()
.await;
match output {
Ok(output) if output.status.success() => return Ok(()),
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
let message = if !stderr.is_empty() {
stderr
} else if !stdout.is_empty() {
stdout
} else {
format!("{} exited with status {}", binary, output.status)
};
return Err(ManagedMysqlError::CommandFailed(message));
}
Err(error) if error.kind() == ErrorKind::NotFound => {
missing_binary = true;
continue;
}
Err(error) => return Err(ManagedMysqlError::Io(error)),
}
}
if missing_binary {
return Err(ManagedMysqlError::ClientMissing);
}
Err(ManagedMysqlError::ClientMissing)
}
}
fn resolve_runtime_config(
config: ManagedMysqlConfig,
) -> Result<ManagedMysqlRuntimeConfig, ManagedMysqlError> {
let parsed = Url::parse(&config.url)
.map_err(|error| ManagedMysqlError::InvalidConfig(error.to_string()))?;
if parsed.scheme() != "mysql" && parsed.scheme() != "mariadb" {
return Err(ManagedMysqlError::InvalidConfig(
"url must use mysql:// or mariadb://".to_string(),
));
}
let admin_host = parsed.host_str().unwrap_or_default().trim().to_string();
let admin_username = parsed.username().trim().to_string();
if admin_host.is_empty() || admin_username.is_empty() {
return Err(ManagedMysqlError::InvalidConfig(
"url must include host and username".to_string(),
));
}
let admin_database = {
let trimmed = parsed.path().trim_start_matches('/').trim();
if trimmed.is_empty() {
"mysql".to_string()
} else {
trimmed.to_string()
}
};
Ok(ManagedMysqlRuntimeConfig {
admin_database,
admin_host: admin_host.clone(),
admin_password: parsed.password().unwrap_or_default().to_string(),
admin_port: parsed.port().unwrap_or(3306),
admin_username,
client_bin: config.bin,
connection_host: config.connection_host.unwrap_or(admin_host),
connection_port: config.connection_port.unwrap_or(parsed.port().unwrap_or(3306)),
phpmyadmin_url: config.phpmyadmin_url,
})
}
fn normalize_token(value: &str, fallback: &str, max_len: usize) -> String {
let mut normalized = String::with_capacity(value.len());
for ch in value.chars() {
if ch.is_ascii_alphanumeric() {
normalized.push(ch.to_ascii_lowercase());
} else if !normalized.ends_with('_') {
normalized.push('_');
}
}
let trimmed = normalized.trim_matches('_');
if trimmed.is_empty() {
return fallback.to_string();
}
trimmed
.chars()
.take(max_len)
.collect::<String>()
.trim_end_matches('_')
.to_string()
}
fn build_database_name(server_uuid: &str, label: &str) -> String {
let server_token = normalize_token(&server_uuid.replace('-', ""), "server", 12);
let label_token = normalize_token(label, "db", 16);
let suffix = Uuid::new_v4().simple().to_string();
format!("srv_{}_{}_{}", server_token, label_token, &suffix[..8])
.chars()
.take(64)
.collect::<String>()
.trim_end_matches('_')
.to_string()
}
fn build_username(server_uuid: &str) -> String {
let server_token = normalize_token(&server_uuid.replace('-', ""), "server", 8);
let suffix = Uuid::new_v4().simple().to_string();
format!("u_{}_{}", server_token, &suffix[..8])
.chars()
.take(32)
.collect::<String>()
.trim_end_matches('_')
.to_string()
}
fn build_password(password: Option<&str>) -> String {
match password {
Some(password) if !password.trim().is_empty() => password.trim().to_string(),
_ => {
let first = Uuid::new_v4().simple().to_string();
let second = Uuid::new_v4().simple().to_string();
format!("{}{}", first, second)
}
}
}
fn escape_identifier(value: &str) -> String {
format!("`{}`", value.replace('`', "``"))
}
fn escape_string(value: &str) -> String {
format!("'{}'", value.replace('\\', "\\\\").replace('\'', "''"))
}
fn build_phpmyadmin_url(base_url: Option<&str>, database_name: &str) -> Option<String> {
let base_url = base_url?.trim();
if base_url.is_empty() {
return None;
}
match Url::parse(base_url) {
Ok(mut url) => {
url.query_pairs_mut().append_pair("db", database_name);
Some(url.to_string())
}
Err(_) => Some(base_url.to_string()),
}
}
+5 -3
View File
@@ -4,6 +4,7 @@ use tokio::time::{interval, Duration};
use tracing::{info, error, warn};
use serde::Deserialize;
use crate::command::CommandDispatcher;
use crate::server::ServerManager;
/// A scheduled task received from the panel API.
@@ -21,6 +22,7 @@ pub struct ScheduledTask {
/// Scheduler that polls the panel API for due tasks and executes them.
pub struct Scheduler {
server_manager: Arc<ServerManager>,
command_dispatcher: Arc<CommandDispatcher>,
api_url: String,
node_token: String,
poll_interval_secs: u64,
@@ -29,11 +31,13 @@ pub struct Scheduler {
impl Scheduler {
pub fn new(
server_manager: Arc<ServerManager>,
command_dispatcher: Arc<CommandDispatcher>,
api_url: String,
node_token: String,
) -> Self {
Self {
server_manager,
command_dispatcher,
api_url,
node_token,
poll_interval_secs: 15,
@@ -117,9 +121,7 @@ impl Scheduler {
match task.action.as_str() {
"command" => {
// Send command to server's stdin via Docker exec
let docker = self.server_manager.docker();
docker
self.command_dispatcher
.send_command(&task.server_uuid, &task.payload)
.await?;
}
+133 -14
View File
@@ -20,6 +20,27 @@ pub struct ServerManager {
}
impl ServerManager {
async fn ensure_server_data_dir(&self, data_path: &PathBuf) -> Result<(), DaemonError> {
tokio::fs::create_dir_all(data_path)
.await
.map_err(DaemonError::Io)?;
#[cfg(unix)]
{
// Containers may run with non-root users (e.g. steam uid 1000).
// Keep server directory writable to avoid install/start failures.
let permissions = std::fs::Permissions::from_mode(0o777);
tokio::fs::set_permissions(data_path, permissions)
.await
.map_err(DaemonError::Io)?;
}
Ok(())
}
fn is_running_state(state: &str) -> bool {
matches!(state, "running" | "restarting")
}
pub fn new(docker: Arc<DockerManager>, config: &DaemonConfig) -> Self {
Self {
servers: Arc::new(RwLock::new(HashMap::new())),
@@ -61,20 +82,7 @@ impl ServerManager {
}
let data_path = self.data_root.join(&uuid);
// Create data directory
tokio::fs::create_dir_all(&data_path)
.await
.map_err(DaemonError::Io)?;
#[cfg(unix)]
{
// Containers may run with non-root users (e.g. steam uid 1000).
// Keep server directory writable to avoid install/start failures.
let permissions = std::fs::Permissions::from_mode(0o777);
tokio::fs::set_permissions(&data_path, permissions)
.await
.map_err(DaemonError::Io)?;
}
self.ensure_server_data_dir(&data_path).await?;
let spec = ServerSpec {
uuid: uuid.clone(),
@@ -109,6 +117,117 @@ impl ServerManager {
Ok(())
}
/// Recreate a server container with updated runtime configuration while preserving data files.
pub async fn update_server(
&self,
uuid: String,
docker_image: String,
memory_limit: i64,
disk_limit: i64,
cpu_limit: i32,
startup_command: String,
environment: HashMap<String, String>,
ports: Vec<PortMap>,
) -> Result<ServerState, DaemonError> {
let existing = {
let servers = self.servers.read().await;
servers.get(&uuid).cloned()
};
if matches!(existing.as_ref().map(|spec| &spec.state), Some(ServerState::Installing)) {
return Err(DaemonError::InvalidStateTransition {
current: "installing".to_string(),
requested: "update".to_string(),
});
}
let runtime_state = self
.docker
.container_state(&uuid)
.await
.map_err(|e| DaemonError::Internal(format!("Failed to inspect container: {}", e)))?;
if existing.is_none() && runtime_state.is_none() {
return Err(DaemonError::ServerNotFound(uuid));
}
let should_restart = runtime_state
.as_deref()
.map(Self::is_running_state)
.unwrap_or_else(|| {
existing
.as_ref()
.map(|spec| matches!(spec.state, ServerState::Running | ServerState::Starting))
.unwrap_or(false)
});
let data_path = existing
.as_ref()
.map(|spec| spec.data_path.clone())
.unwrap_or_else(|| self.data_root.join(&uuid));
self.ensure_server_data_dir(&data_path).await?;
let mut desired_spec = ServerSpec {
uuid: uuid.clone(),
docker_image,
memory_limit,
disk_limit,
cpu_limit,
startup_command,
environment,
ports,
data_path,
state: ServerState::Stopped,
container_id: None,
};
if runtime_state
.as_deref()
.map(Self::is_running_state)
.unwrap_or(false)
{
if let Err(stop_error) = self.docker.stop_container(&uuid, 30).await {
warn!(uuid = %uuid, error = %stop_error, "Graceful stop failed during server update, forcing kill");
self.docker.kill_container(&uuid).await.map_err(|e| {
DaemonError::Internal(format!("Failed to stop running container during update: {}", e))
})?;
}
}
if runtime_state.is_some() {
self.docker.remove_container(&uuid).await.map_err(|e| {
DaemonError::Internal(format!("Failed to remove existing container during update: {}", e))
})?;
}
match self.docker.create_container(&desired_spec).await {
Ok(container_id) => {
desired_spec.container_id = Some(container_id);
}
Err(error) => {
desired_spec.state = ServerState::Error;
let mut servers = self.servers.write().await;
servers.insert(uuid.clone(), desired_spec);
return Err(DaemonError::Internal(format!(
"Failed to recreate container during update: {}",
error
)));
}
}
{
let mut servers = self.servers.write().await;
servers.insert(uuid.clone(), desired_spec);
}
if should_restart {
self.start_server(&uuid).await?;
return Ok(ServerState::Running);
}
Ok(ServerState::Stopped)
}
/// Install a server: pull image, create container.
async fn install_server(
docker: Arc<DockerManager>,