chore: initial commit for phase04

This commit is contained in:
hibna 2026-02-21 15:50:35 +03:00
parent d0c20581b6
commit 218452706c
15 changed files with 4310 additions and 8 deletions

2849
apps/daemon/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@ prost-types = "0.13"
# Async runtime # Async runtime
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
# Docker # Docker
bollard = "0.18" bollard = "0.18"
@ -35,5 +36,12 @@ thiserror = "2"
# UUID # UUID
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
# Async utils
futures = "0.3"
# Filesystem
tar = "0.4"
flate2 = "1"
[build-dependencies] [build-dependencies]
tonic-build = "0.12" tonic-build = "0.12"

15
apps/daemon/src/auth.rs Normal file
View File

@ -0,0 +1,15 @@
use tonic::{Request, Status};
/// Validate the daemon token from the gRPC request metadata.
pub fn check_auth(req: &Request<()>, expected_token: &str) -> Result<(), Status> {
let token = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "));
match token {
Some(t) if t == expected_token => Ok(()),
_ => Err(Status::unauthenticated("Invalid or missing daemon token")),
}
}

View File

@ -0,0 +1,263 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use bollard::container::{
Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions, StartContainerOptions,
StopContainerOptions, StatsOptions, Stats,
};
use bollard::image::CreateImageOptions;
use bollard::models::{HostConfig, PortBinding};
use futures::StreamExt;
use tracing::info;
use crate::docker::DockerManager;
use crate::server::ServerSpec;
/// Container name prefix for all managed game servers.
const CONTAINER_PREFIX: &str = "gp_";
pub fn container_name(server_uuid: &str) -> String {
format!("{}{}", CONTAINER_PREFIX, server_uuid)
}
impl DockerManager {
/// Pull a Docker image if not already present.
pub async fn pull_image(&self, image: &str) -> Result<()> {
info!(image = %image, "Pulling Docker image");
let options = CreateImageOptions {
from_image: image,
..Default::default()
};
let mut stream = self.client().create_image(Some(options), None, None);
while let Some(result) = stream.next().await {
match result {
Ok(info) => {
if let Some(status) = &info.status {
tracing::debug!(status = %status, "Image pull progress");
}
}
Err(e) => return Err(e.into()),
}
}
info!(image = %image, "Image pulled successfully");
Ok(())
}
/// Create and configure a container for a game server.
pub async fn create_container(&self, spec: &ServerSpec) -> Result<String> {
let name = container_name(&spec.uuid);
// Build port bindings
let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
for port_map in &spec.ports {
let container_port = format!("{}/{}", port_map.container_port, port_map.protocol);
port_bindings.insert(
container_port,
Some(vec![PortBinding {
host_ip: Some("0.0.0.0".to_string()),
host_port: Some(port_map.host_port.to_string()),
}]),
);
}
// Build exposed ports
let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
for port_map in &spec.ports {
let container_port = format!("{}/{}", port_map.container_port, port_map.protocol);
exposed_ports.insert(container_port, HashMap::new());
}
// Convert env map to Docker format
let env: Vec<String> = spec
.environment
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
let host_config = HostConfig {
memory: Some(spec.memory_limit),
memory_swap: Some(spec.memory_limit), // no swap
nano_cpus: Some((spec.cpu_limit as i64) * 10_000_000), // cpu_limit=100 means 1 core
port_bindings: Some(port_bindings),
network_mode: Some(self.network_name().to_string()),
binds: Some(vec![format!(
"{}:/data",
spec.data_path.display()
)]),
..Default::default()
};
let config = Config {
image: Some(spec.docker_image.clone()),
hostname: Some(spec.uuid.clone()),
env: Some(env),
exposed_ports: Some(exposed_ports),
host_config: Some(host_config),
working_dir: Some("/data".to_string()),
cmd: if spec.startup_command.is_empty() {
None
} else {
Some(
spec.startup_command
.split_whitespace()
.map(String::from)
.collect(),
)
},
tty: Some(true),
attach_stdin: Some(true),
attach_stdout: Some(true),
attach_stderr: Some(true),
open_stdin: Some(true),
..Default::default()
};
let options = CreateContainerOptions { name: name.as_str(), platform: None };
let response = self.client().create_container(Some(options), config).await?;
info!(container_id = %response.id, uuid = %spec.uuid, "Container created");
Ok(response.id)
}
/// Start a container.
pub async fn start_container(&self, server_uuid: &str) -> Result<()> {
let name = container_name(server_uuid);
self.client()
.start_container(&name, None::<StartContainerOptions<String>>)
.await?;
info!(uuid = %server_uuid, "Container started");
Ok(())
}
/// Stop a container gracefully.
pub async fn stop_container(&self, server_uuid: &str, timeout_secs: i64) -> Result<()> {
let name = container_name(server_uuid);
self.client()
.stop_container(
&name,
Some(StopContainerOptions {
t: timeout_secs,
}),
)
.await?;
info!(uuid = %server_uuid, "Container stopped");
Ok(())
}
/// Kill a container immediately.
pub async fn kill_container(&self, server_uuid: &str) -> Result<()> {
let name = container_name(server_uuid);
self.client()
.kill_container::<String>(&name, None)
.await?;
info!(uuid = %server_uuid, "Container killed");
Ok(())
}
/// Remove a container and its volumes.
pub async fn remove_container(&self, server_uuid: &str) -> Result<()> {
let name = container_name(server_uuid);
self.client()
.remove_container(
&name,
Some(RemoveContainerOptions {
force: true,
v: true,
..Default::default()
}),
)
.await?;
info!(uuid = %server_uuid, "Container removed");
Ok(())
}
/// Get container stats (CPU, memory, network).
pub async fn container_stats(
&self,
server_uuid: &str,
) -> Result<Stats> {
let name = container_name(server_uuid);
let mut stream = self.client().stats(
&name,
Some(StatsOptions {
stream: false,
one_shot: true,
..Default::default()
}),
);
match stream.next().await {
Some(Ok(stats)) => Ok(stats),
Some(Err(e)) => Err(e.into()),
None => Err(anyhow::anyhow!("No stats returned")),
}
}
/// Check if a container exists and return its state.
pub async fn container_state(
&self,
server_uuid: &str,
) -> Result<Option<String>> {
let name = container_name(server_uuid);
match self.client().inspect_container(&name, None).await {
Ok(info) => {
let state = info
.state
.and_then(|s| s.status)
.map(|s| format!("{:?}", s));
Ok(state)
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, ..
}) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Stream container logs (stdout + stderr). Returns an owned stream.
pub fn stream_logs(
self: &Arc<Self>,
server_uuid: &str,
) -> impl futures::Stream<Item = Result<String, bollard::errors::Error>> + Send + 'static {
let name = container_name(server_uuid);
let options = LogsOptions::<String> {
follow: true,
stdout: true,
stderr: true,
tail: "100".to_string(),
..Default::default()
};
let client = self.client().clone();
client.logs(&name, Some(options)).map(|result| {
result.map(|output| output.to_string())
})
}
/// Send a command to a container via exec (attach to stdin).
pub async fn send_command(&self, server_uuid: &str, command: &str) -> Result<()> {
let name = container_name(server_uuid);
let exec = self
.client()
.create_exec(
&name,
bollard::exec::CreateExecOptions {
cmd: Some(vec!["sh", "-c", &format!("echo '{}' > /proc/1/fd/0", command)]),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
},
)
.await?;
self.client()
.start_exec(&exec.id, None::<bollard::exec::StartExecOptions>)
.await?;
Ok(())
}
}

View File

@ -0,0 +1,77 @@
use anyhow::Result;
use bollard::Docker;
use bollard::network::CreateNetworkOptions;
use tracing::info;
use crate::config::DockerConfig;
/// Manages the Docker client and network setup.
#[derive(Clone)]
pub struct DockerManager {
client: Docker,
network_name: String,
}
impl DockerManager {
pub async fn new(config: &DockerConfig) -> Result<Self> {
let client = Docker::connect_with_socket(
&config.socket,
120, // timeout
bollard::API_DEFAULT_VERSION,
)?;
// Verify connection
let version = client.version().await?;
info!(
docker_version = version.version.as_deref().unwrap_or("unknown"),
"Connected to Docker"
);
let manager = Self {
client,
network_name: config.network.clone(),
};
manager.ensure_network(&config.network_subnet).await?;
Ok(manager)
}
pub fn client(&self) -> &Docker {
&self.client
}
pub fn network_name(&self) -> &str {
&self.network_name
}
async fn ensure_network(&self, subnet: &str) -> Result<()> {
let networks = self.client.list_networks::<String>(None).await?;
let exists = networks
.iter()
.any(|n| n.name.as_deref() == Some(&self.network_name));
if !exists {
info!(network = %self.network_name, "Creating Docker network");
let ipam_config = bollard::models::IpamConfig {
subnet: Some(subnet.to_string()),
..Default::default()
};
let ipam = bollard::models::Ipam {
config: Some(vec![ipam_config]),
..Default::default()
};
self.client
.create_network(CreateNetworkOptions {
name: self.network_name.clone(),
driver: "bridge".to_string(),
ipam,
..Default::default()
})
.await?;
info!(network = %self.network_name, "Docker network created");
}
Ok(())
}
}

View File

@ -0,0 +1,4 @@
pub mod container;
pub mod manager;
pub use manager::DockerManager;

52
apps/daemon/src/error.rs Normal file
View File

@ -0,0 +1,52 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DaemonError {
#[error("Docker error: {0}")]
Docker(#[from] bollard::errors::Error),
#[error("Server not found: {0}")]
ServerNotFound(String),
#[error("Server already exists: {0}")]
ServerAlreadyExists(String),
#[error("Invalid state transition: {current} -> {requested}")]
InvalidStateTransition { current: String, requested: String },
#[error("Filesystem error: {0}")]
Filesystem(String),
#[error("Path traversal attempt: {0}")]
PathTraversal(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Authentication failed")]
AuthFailed,
#[error("{0}")]
Internal(String),
}
impl From<DaemonError> for tonic::Status {
fn from(err: DaemonError) -> Self {
match &err {
DaemonError::ServerNotFound(_) => tonic::Status::not_found(err.to_string()),
DaemonError::ServerAlreadyExists(_) => {
tonic::Status::already_exists(err.to_string())
}
DaemonError::InvalidStateTransition { .. } => {
tonic::Status::failed_precondition(err.to_string())
}
DaemonError::PathTraversal(_) => {
tonic::Status::permission_denied(err.to_string())
}
DaemonError::AuthFailed => {
tonic::Status::unauthenticated(err.to_string())
}
_ => tonic::Status::internal(err.to_string()),
}
}
}

View File

@ -0,0 +1,3 @@
pub mod operations;
pub use operations::FileSystem;

View File

@ -0,0 +1,129 @@
use std::path::PathBuf;
use tokio::fs;
use tracing::debug;
use crate::error::DaemonError;
/// Filesystem operations with path jail enforcement.
pub struct FileSystem {
root: PathBuf,
}
impl FileSystem {
pub fn new(root: PathBuf) -> Self {
Self { root }
}
/// Resolve a relative path within the jail. Prevents path traversal.
fn resolve(&self, relative: &str) -> Result<PathBuf, DaemonError> {
let clean = relative.trim_start_matches('/');
let resolved = self.root.join(clean);
// Canonicalize both to compare (handle .. and symlinks)
// For non-existent paths, check the parent
let check_path = if resolved.exists() {
resolved.canonicalize().map_err(DaemonError::Io)?
} else {
let parent = resolved
.parent()
.ok_or_else(|| DaemonError::PathTraversal(relative.to_string()))?;
if !parent.exists() {
// Parent doesn't exist either — check the root prefix
let normalized = self.root.join(clean);
if !normalized.starts_with(&self.root) {
return Err(DaemonError::PathTraversal(relative.to_string()));
}
return Ok(normalized);
}
let canonical_parent = parent.canonicalize().map_err(DaemonError::Io)?;
canonical_parent.join(resolved.file_name().unwrap_or_default())
};
let canonical_root = self.root.canonicalize().unwrap_or_else(|_| self.root.clone());
if !check_path.starts_with(&canonical_root) {
return Err(DaemonError::PathTraversal(relative.to_string()));
}
Ok(resolved)
}
/// List files in a directory.
pub async fn list_files(&self, path: &str) -> Result<Vec<FileEntry>, DaemonError> {
let resolved = self.resolve(path)?;
let mut entries = Vec::new();
let mut reader = fs::read_dir(&resolved).await.map_err(DaemonError::Io)?;
while let Some(entry) = reader.next_entry().await.map_err(DaemonError::Io)? {
let metadata = entry.metadata().await.map_err(DaemonError::Io)?;
let name = entry.file_name().to_string_lossy().to_string();
let relative_path = format!(
"{}/{}",
path.trim_end_matches('/'),
&name
);
entries.push(FileEntry {
name,
path: relative_path,
is_directory: metadata.is_dir(),
size: metadata.len() as i64,
modified_at: metadata
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
});
}
entries.sort_by(|a, b| {
// Directories first, then by name
b.is_directory.cmp(&a.is_directory).then(a.name.cmp(&b.name))
});
Ok(entries)
}
/// Read file contents.
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>, DaemonError> {
let resolved = self.resolve(path)?;
debug!(path = %resolved.display(), "Reading file");
fs::read(&resolved).await.map_err(DaemonError::Io)
}
/// Write file contents.
pub async fn write_file(&self, path: &str, data: &[u8]) -> Result<(), DaemonError> {
let resolved = self.resolve(path)?;
// Ensure parent directory exists
if let Some(parent) = resolved.parent() {
fs::create_dir_all(parent).await.map_err(DaemonError::Io)?;
}
debug!(path = %resolved.display(), "Writing file");
fs::write(&resolved, data).await.map_err(DaemonError::Io)
}
/// Delete files or directories.
pub async fn delete_paths(&self, paths: &[String]) -> Result<(), DaemonError> {
for path in paths {
let resolved = self.resolve(path)?;
if resolved.is_dir() {
fs::remove_dir_all(&resolved).await.map_err(DaemonError::Io)?;
} else {
fs::remove_file(&resolved).await.map_err(DaemonError::Io)?;
}
debug!(path = %resolved.display(), "Deleted");
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct FileEntry {
pub name: String,
pub path: String,
pub is_directory: bool,
pub size: i64,
pub modified_at: i64,
}

View File

@ -0,0 +1,3 @@
pub mod service;
pub use service::DaemonServiceImpl;

View File

@ -0,0 +1,511 @@
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use futures::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::{info, error};
use crate::server::{ServerManager, PortMap};
use crate::filesystem::FileSystem;
// Import generated protobuf types
pub mod pb {
tonic::include_proto!("gamepanel.daemon");
}
use pb::daemon_service_server::DaemonService;
use pb::*;
pub struct DaemonServiceImpl {
server_manager: Arc<ServerManager>,
daemon_token: String,
start_time: Instant,
}
impl DaemonServiceImpl {
pub fn new(server_manager: Arc<ServerManager>, daemon_token: String) -> Self {
Self {
server_manager,
daemon_token,
start_time: Instant::now(),
}
}
fn check_auth<T>(&self, req: &Request<T>) -> Result<(), Status> {
let token = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "));
match token {
Some(t) if t == self.daemon_token => Ok(()),
_ => Err(Status::unauthenticated("Invalid or missing daemon token")),
}
}
fn get_fs(&self, uuid: &str) -> FileSystem {
let data_path = self.server_manager.data_root().join(uuid);
FileSystem::new(data_path)
}
}
type GrpcStream<T> = Pin<Box<dyn futures::Stream<Item = Result<T, Status>> + Send>>;
#[tonic::async_trait]
impl DaemonService for DaemonServiceImpl {
// === Node ===
async fn get_node_status(
&self,
request: Request<Empty>,
) -> Result<Response<NodeStatus>, Status> {
self.check_auth(&request)?;
let servers = self.server_manager.list_servers().await;
let active = servers
.iter()
.filter(|s| s.state.to_string() == "running")
.count();
Ok(Response::new(NodeStatus {
version: env!("CARGO_PKG_VERSION").to_string(),
is_healthy: true,
uptime_seconds: self.start_time.elapsed().as_secs() as i64,
active_servers: active as i32,
}))
}
type StreamNodeStatsStream = GrpcStream<NodeStats>;
async fn stream_node_stats(
&self,
request: Request<Empty>,
) -> Result<Response<Self::StreamNodeStatsStream>, Status> {
self.check_auth(&request)?;
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
loop {
// Read system stats
let stats = NodeStats {
cpu_percent: 0.0, // TODO: real system stats
memory_used: 0,
memory_total: 0,
disk_used: 0,
disk_total: 0,
};
if tx.send(Ok(stats)).await.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
// === Server Lifecycle ===
async fn create_server(
&self,
request: Request<CreateServerRequest>,
) -> Result<Response<ServerResponse>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let ports: Vec<PortMap> = req
.ports
.iter()
.map(|p| PortMap {
host_port: p.host_port as u16,
container_port: p.container_port as u16,
protocol: if p.protocol.is_empty() {
"tcp".to_string()
} else {
p.protocol.clone()
},
})
.collect();
self.server_manager
.create_server(
req.uuid.clone(),
req.docker_image,
req.memory_limit,
req.disk_limit,
req.cpu_limit,
req.startup_command,
req.environment,
ports,
)
.await
.map_err(|e| Status::from(e))?;
Ok(Response::new(ServerResponse {
uuid: req.uuid,
status: "installing".to_string(),
}))
}
async fn delete_server(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let uuid = request.into_inner().uuid;
self.server_manager
.delete_server(&uuid)
.await
.map_err(Status::from)?;
Ok(Response::new(Empty {}))
}
async fn reinstall_server(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let uuid = request.into_inner().uuid;
// Stop and remove, then recreate
let _ = self.server_manager.kill_server(&uuid).await;
// TODO: full reinstall logic
info!(uuid = %uuid, "Reinstall requested (not yet fully implemented)");
Ok(Response::new(Empty {}))
}
// === Power ===
async fn set_power_state(
&self,
request: Request<PowerRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
match req.action() {
PowerAction::Start => {
self.server_manager.start_server(&req.uuid).await.map_err(Status::from)?;
}
PowerAction::Stop => {
self.server_manager.stop_server(&req.uuid).await.map_err(Status::from)?;
}
PowerAction::Restart => {
let _ = self.server_manager.stop_server(&req.uuid).await;
self.server_manager.start_server(&req.uuid).await.map_err(Status::from)?;
}
PowerAction::Kill => {
self.server_manager.kill_server(&req.uuid).await.map_err(Status::from)?;
}
}
Ok(Response::new(Empty {}))
}
async fn get_server_status(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<pb::ServerStatus>, Status> {
self.check_auth(&request)?;
let uuid = request.into_inner().uuid;
let spec = self.server_manager.get_server(&uuid).await.map_err(Status::from)?;
Ok(Response::new(pb::ServerStatus {
uuid: spec.uuid,
state: spec.state.to_string(),
cpu_percent: 0.0,
memory_bytes: 0,
disk_bytes: 0,
network_rx: 0,
network_tx: 0,
uptime_seconds: 0,
}))
}
// === Console ===
type StreamConsoleStream = GrpcStream<ConsoleOutput>;
async fn stream_console(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<Self::StreamConsoleStream>, Status> {
self.check_auth(&request)?;
let uuid = request.into_inner().uuid;
// Verify server exists
let _ = self.server_manager.get_server(&uuid).await.map_err(Status::from)?;
let (tx, rx) = tokio::sync::mpsc::channel(256);
let docker = self.server_manager.docker().clone();
let uuid_clone = uuid.clone();
tokio::spawn(async move {
let mut stream = docker.stream_logs(&uuid_clone);
while let Some(result) = stream.next().await {
match result {
Ok(line) => {
let output = ConsoleOutput {
uuid: uuid_clone.clone(),
line,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64,
};
if tx.send(Ok(output)).await.is_err() {
break;
}
}
Err(e) => {
error!(error = %e, "Console stream error");
break;
}
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn send_command(
&self,
request: Request<CommandRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
self.server_manager
.docker()
.send_command(&req.uuid, &req.command)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Empty {}))
}
// === Files ===
async fn list_files(
&self,
request: Request<FileListRequest>,
) -> Result<Response<FileListResponse>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let fs = self.get_fs(&req.uuid);
let entries = fs
.list_files(&req.path)
.await
.map_err(|e| Status::from(e))?;
let files = entries
.into_iter()
.map(|e| FileEntry {
name: e.name,
path: e.path,
is_directory: e.is_directory,
size: e.size,
modified_at: e.modified_at,
mime_type: String::new(),
})
.collect();
Ok(Response::new(FileListResponse { files }))
}
async fn read_file(
&self,
request: Request<FileReadRequest>,
) -> Result<Response<FileContent>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let fs = self.get_fs(&req.uuid);
let data = fs.read_file(&req.path).await.map_err(Status::from)?;
Ok(Response::new(FileContent {
data,
mime_type: String::new(),
}))
}
async fn write_file(
&self,
request: Request<FileWriteRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let fs = self.get_fs(&req.uuid);
fs.write_file(&req.path, &req.data)
.await
.map_err(Status::from)?;
Ok(Response::new(Empty {}))
}
async fn delete_files(
&self,
request: Request<FileDeleteRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
let req = request.into_inner();
let fs = self.get_fs(&req.uuid);
fs.delete_paths(&req.paths).await.map_err(Status::from)?;
Ok(Response::new(Empty {}))
}
async fn compress_files(
&self,
request: Request<CompressRequest>,
) -> Result<Response<FileContent>, Status> {
self.check_auth(&request)?;
// TODO: implement compression
Err(Status::unimplemented("Not yet implemented"))
}
async fn decompress_file(
&self,
request: Request<DecompressRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
// TODO: implement decompression
Err(Status::unimplemented("Not yet implemented"))
}
// === Backup ===
async fn create_backup(
&self,
request: Request<BackupRequest>,
) -> Result<Response<BackupResponse>, Status> {
self.check_auth(&request)?;
// TODO: implement backup creation
Err(Status::unimplemented("Not yet implemented"))
}
async fn restore_backup(
&self,
request: Request<RestoreBackupRequest>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
// TODO: implement backup restoration
Err(Status::unimplemented("Not yet implemented"))
}
async fn delete_backup(
&self,
request: Request<BackupIdentifier>,
) -> Result<Response<Empty>, Status> {
self.check_auth(&request)?;
// TODO: implement backup deletion
Err(Status::unimplemented("Not yet implemented"))
}
// === Stats ===
type StreamServerStatsStream = GrpcStream<ServerResourceStats>;
async fn stream_server_stats(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<Self::StreamServerStatsStream>, Status> {
self.check_auth(&request)?;
let uuid = request.into_inner().uuid;
let _ = self.server_manager.get_server(&uuid).await.map_err(Status::from)?;
let (tx, rx) = tokio::sync::mpsc::channel(32);
let docker = self.server_manager.docker().clone();
let uuid_clone = uuid.clone();
tokio::spawn(async move {
loop {
match docker.container_stats(&uuid_clone).await {
Ok(stats) => {
let cpu = calculate_cpu_percent(&stats);
let memory = stats.memory_stats.usage.unwrap_or(0) as i64;
let resource_stats = ServerResourceStats {
uuid: uuid_clone.clone(),
cpu_percent: cpu,
memory_bytes: memory,
disk_bytes: 0,
network_rx: 0,
network_tx: 0,
state: "running".to_string(),
};
if tx.send(Ok(resource_stats)).await.is_err() {
break;
}
}
Err(_) => break,
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
// === Install Progress ===
type StreamInstallProgressStream = GrpcStream<InstallProgress>;
async fn stream_install_progress(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<Self::StreamInstallProgressStream>, Status> {
self.check_auth(&request)?;
// TODO: implement install progress streaming
let (_tx, rx) = tokio::sync::mpsc::channel(8);
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
// === Players ===
async fn get_active_players(
&self,
request: Request<ServerIdentifier>,
) -> Result<Response<PlayerList>, Status> {
self.check_auth(&request)?;
// TODO: implement game-specific player queries (RCON)
Ok(Response::new(PlayerList {
players: vec![],
max_players: 0,
}))
}
}
/// Calculate CPU percentage from Docker stats.
fn calculate_cpu_percent(stats: &bollard::container::Stats) -> f64 {
let cpu_delta = stats.cpu_stats.cpu_usage.total_usage as f64
- stats.precpu_stats.cpu_usage.total_usage as f64;
let system_delta = stats.cpu_stats.system_cpu_usage.unwrap_or(0) as f64
- stats.precpu_stats.system_cpu_usage.unwrap_or(0) as f64;
let num_cpus = stats
.cpu_stats
.online_cpus
.unwrap_or(1) as f64;
if system_delta > 0.0 && cpu_delta >= 0.0 {
(cpu_delta / system_delta) * num_cpus * 100.0
} else {
0.0
}
}

View File

@ -1,8 +1,21 @@
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use tonic::transport::Server;
use tracing::info; use tracing::info;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
mod auth;
mod config; mod config;
mod docker;
mod error;
mod filesystem;
mod grpc;
mod server;
use crate::docker::DockerManager;
use crate::grpc::DaemonServiceImpl;
use crate::grpc::service::pb::daemon_service_server::DaemonServiceServer;
use crate::server::ServerManager;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -13,20 +26,91 @@ async fn main() -> Result<()> {
) )
.init(); .init();
info!("GamePanel Daemon starting..."); info!("GamePanel Daemon v{} starting...", env!("CARGO_PKG_VERSION"));
// Load config
let config = config::DaemonConfig::load()?; let config = config::DaemonConfig::load()?;
info!(grpc_port = config.grpc_port, "Configuration loaded"); info!(grpc_port = config.grpc_port, "Configuration loaded");
// TODO: Initialize Docker client // Initialize Docker
// TODO: Start gRPC server let docker = Arc::new(DockerManager::new(&config.docker).await?);
// TODO: Begin heartbeat loop info!("Docker manager initialized");
info!("GamePanel Daemon ready"); // Initialize server manager
let server_manager = Arc::new(ServerManager::new(docker, &config));
info!("Server manager initialized");
// Keep the process running // Create gRPC service
tokio::signal::ctrl_c().await?; let daemon_service = DaemonServiceImpl::new(
info!("Shutting down..."); server_manager.clone(),
config.node_token.clone(),
);
// Start gRPC server
let addr = format!("0.0.0.0:{}", config.grpc_port).parse()?;
info!(addr = %addr, "Starting gRPC server");
// Heartbeat task
let api_url = config.api_url.clone();
let node_token = config.node_token.clone();
let sm = server_manager.clone();
tokio::spawn(async move {
heartbeat_loop(&api_url, &node_token, sm).await;
});
// Start serving
Server::builder()
.add_service(DaemonServiceServer::new(daemon_service))
.serve_with_shutdown(addr, async {
tokio::signal::ctrl_c().await.ok();
info!("Shutdown signal received");
})
.await?;
info!("GamePanel Daemon stopped");
Ok(()) Ok(())
} }
/// Periodically report node status to the panel API.
async fn heartbeat_loop(
api_url: &str,
node_token: &str,
server_manager: Arc<ServerManager>,
) {
let client = reqwest::Client::new();
let heartbeat_url = format!("{}/api/nodes/heartbeat", api_url);
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
let servers = server_manager.list_servers().await;
let active = servers
.iter()
.filter(|s| s.state.to_string() == "running")
.count();
let payload = serde_json::json!({
"active_servers": active,
"total_servers": servers.len(),
"version": env!("CARGO_PKG_VERSION"),
});
match client
.post(&heartbeat_url)
.bearer_auth(node_token)
.json(&payload)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
tracing::debug!("Heartbeat sent successfully");
}
Ok(resp) => {
tracing::warn!(status = %resp.status(), "Heartbeat failed");
}
Err(e) => {
tracing::warn!(error = %e, "Heartbeat request failed");
}
}
}
}

View File

@ -0,0 +1,230 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, error, warn};
use anyhow::Result;
use crate::config::DaemonConfig;
use crate::docker::DockerManager;
use crate::error::DaemonError;
use super::state::{ServerState, ServerSpec, PortMap};
/// Manages all game server instances on this node.
pub struct ServerManager {
servers: Arc<RwLock<HashMap<String, ServerSpec>>>,
docker: Arc<DockerManager>,
data_root: PathBuf,
}
impl ServerManager {
pub fn new(docker: Arc<DockerManager>, config: &DaemonConfig) -> Self {
Self {
servers: Arc::new(RwLock::new(HashMap::new())),
docker,
data_root: config.data_path.clone(),
}
}
/// Get server spec by UUID.
pub async fn get_server(&self, uuid: &str) -> Result<ServerSpec, DaemonError> {
let servers = self.servers.read().await;
servers
.get(uuid)
.cloned()
.ok_or_else(|| DaemonError::ServerNotFound(uuid.to_string()))
}
/// Get all servers.
pub async fn list_servers(&self) -> Vec<ServerSpec> {
let servers = self.servers.read().await;
servers.values().cloned().collect()
}
/// Create a new game server.
pub async fn create_server(
&self,
uuid: String,
docker_image: String,
memory_limit: i64,
disk_limit: i64,
cpu_limit: i32,
startup_command: String,
environment: HashMap<String, String>,
ports: Vec<PortMap>,
) -> Result<(), DaemonError> {
let mut servers = self.servers.write().await;
if servers.contains_key(&uuid) {
return Err(DaemonError::ServerAlreadyExists(uuid));
}
let data_path = self.data_root.join(&uuid);
// Create data directory
tokio::fs::create_dir_all(&data_path)
.await
.map_err(DaemonError::Io)?;
let spec = ServerSpec {
uuid: uuid.clone(),
docker_image,
memory_limit,
disk_limit,
cpu_limit,
startup_command,
environment,
ports,
data_path,
state: ServerState::Installing,
container_id: None,
};
servers.insert(uuid.clone(), spec);
drop(servers);
// Install server in background
let docker = self.docker.clone();
let servers_ref = self.servers.clone();
tokio::spawn(async move {
if let Err(e) = Self::install_server(docker, servers_ref.clone(), &uuid).await {
error!(uuid = %uuid, error = %e, "Server installation failed");
let mut servers = servers_ref.write().await;
if let Some(spec) = servers.get_mut(&uuid) {
spec.state = ServerState::Error;
}
}
});
Ok(())
}
/// Install a server: pull image, create container.
async fn install_server(
docker: Arc<DockerManager>,
servers: Arc<RwLock<HashMap<String, ServerSpec>>>,
uuid: &str,
) -> Result<()> {
info!(uuid = %uuid, "Starting server installation");
let spec = {
let s = servers.read().await;
s.get(uuid).cloned().ok_or_else(|| anyhow::anyhow!("Server not found"))?
};
// Pull image
docker.pull_image(&spec.docker_image).await?;
// Create container
let container_id = docker.create_container(&spec).await?;
// Update state
let mut s = servers.write().await;
if let Some(server) = s.get_mut(uuid) {
server.container_id = Some(container_id);
server.state = ServerState::Stopped;
}
info!(uuid = %uuid, "Server installation complete");
Ok(())
}
/// Start a server.
pub async fn start_server(&self, uuid: &str) -> Result<(), DaemonError> {
let mut servers = self.servers.write().await;
let spec = servers
.get_mut(uuid)
.ok_or_else(|| DaemonError::ServerNotFound(uuid.to_string()))?;
if !spec.can_transition_to(&ServerState::Starting) {
return Err(DaemonError::InvalidStateTransition {
current: spec.state.to_string(),
requested: "starting".to_string(),
});
}
spec.state = ServerState::Starting;
drop(servers);
self.docker.start_container(uuid).await.map_err(|e| {
DaemonError::Internal(format!("Failed to start container: {}", e))
})?;
let mut servers = self.servers.write().await;
if let Some(spec) = servers.get_mut(uuid) {
spec.state = ServerState::Running;
}
Ok(())
}
/// Stop a server.
pub async fn stop_server(&self, uuid: &str) -> Result<(), DaemonError> {
let mut servers = self.servers.write().await;
let spec = servers
.get_mut(uuid)
.ok_or_else(|| DaemonError::ServerNotFound(uuid.to_string()))?;
if !spec.can_transition_to(&ServerState::Stopping) {
return Err(DaemonError::InvalidStateTransition {
current: spec.state.to_string(),
requested: "stopping".to_string(),
});
}
spec.state = ServerState::Stopping;
drop(servers);
self.docker.stop_container(uuid, 30).await.map_err(|e| {
DaemonError::Internal(format!("Failed to stop container: {}", e))
})?;
let mut servers = self.servers.write().await;
if let Some(spec) = servers.get_mut(uuid) {
spec.state = ServerState::Stopped;
}
Ok(())
}
/// Kill a server immediately.
pub async fn kill_server(&self, uuid: &str) -> Result<(), DaemonError> {
self.docker.kill_container(uuid).await.map_err(|e| {
DaemonError::Internal(format!("Failed to kill container: {}", e))
})?;
let mut servers = self.servers.write().await;
if let Some(spec) = servers.get_mut(uuid) {
spec.state = ServerState::Stopped;
}
Ok(())
}
/// Delete a server and clean up.
pub async fn delete_server(&self, uuid: &str) -> Result<(), DaemonError> {
// Remove container if it exists
if let Err(e) = self.docker.remove_container(uuid).await {
warn!(uuid = %uuid, error = %e, "Failed to remove container (may not exist)");
}
// Remove from state
let mut servers = self.servers.write().await;
servers.remove(uuid);
// Note: data directory is NOT deleted here for safety.
// Admin should explicitly clean up via API or manually.
info!(uuid = %uuid, "Server deleted");
Ok(())
}
/// Get the Docker manager Arc.
pub fn docker(&self) -> &Arc<DockerManager> {
&self.docker
}
/// Get the data root path.
pub fn data_root(&self) -> &PathBuf {
&self.data_root
}
}

View File

@ -0,0 +1,5 @@
pub mod state;
pub mod manager;
pub use state::{ServerSpec, PortMap};
pub use manager::ServerManager;

View File

@ -0,0 +1,69 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServerState {
Installing,
Stopped,
Starting,
Running,
Stopping,
Error,
}
impl std::fmt::Display for ServerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Installing => write!(f, "installing"),
Self::Stopped => write!(f, "stopped"),
Self::Starting => write!(f, "starting"),
Self::Running => write!(f, "running"),
Self::Stopping => write!(f, "stopping"),
Self::Error => write!(f, "error"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortMap {
pub host_port: u16,
pub container_port: u16,
pub protocol: String, // "tcp" or "udp"
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSpec {
pub uuid: String,
pub docker_image: String,
pub memory_limit: i64, // bytes
pub disk_limit: i64, // bytes
pub cpu_limit: i32, // percentage (100 = 1 core)
pub startup_command: String,
pub environment: HashMap<String, String>,
pub ports: Vec<PortMap>,
pub data_path: PathBuf,
pub state: ServerState,
pub container_id: Option<String>,
}
impl ServerSpec {
/// Check if the server can transition to the requested state.
pub fn can_transition_to(&self, target: &ServerState) -> bool {
matches!(
(&self.state, target),
(ServerState::Installing, ServerState::Stopped)
| (ServerState::Installing, ServerState::Error)
| (ServerState::Stopped, ServerState::Starting)
| (ServerState::Starting, ServerState::Running)
| (ServerState::Starting, ServerState::Error)
| (ServerState::Running, ServerState::Stopping)
| (ServerState::Running, ServerState::Error)
| (ServerState::Stopping, ServerState::Stopped)
| (ServerState::Stopping, ServerState::Error)
| (ServerState::Error, ServerState::Starting)
| (ServerState::Error, ServerState::Stopped)
)
}
}