chore: initial commit for phase07
This commit is contained in:
Generated
+18
@@ -1024,6 +1024,16 @@ version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "mime_guess"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
|
||||
dependencies = [
|
||||
"mime",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.9"
|
||||
@@ -1436,6 +1446,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
@@ -1447,6 +1458,7 @@ dependencies = [
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"native-tls",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
@@ -2186,6 +2198,12 @@ version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.24"
|
||||
|
||||
@@ -23,7 +23,7 @@ serde_json = "1"
|
||||
serde_yaml = "0.9"
|
||||
|
||||
# HTTP client (for CDN uploads, API callbacks)
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
||||
reqwest = { version = "0.12", features = ["json", "multipart"] }
|
||||
|
||||
# Logging
|
||||
tracing = "0.1"
|
||||
|
||||
@@ -0,0 +1,332 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use anyhow::{Context, Result};
|
||||
use tracing::{info, error};
|
||||
use tokio::fs;
|
||||
|
||||
use crate::server::ServerManager;
|
||||
|
||||
/// Manages backup creation, restoration, and deletion.
|
||||
pub struct BackupManager {
|
||||
server_manager: Arc<ServerManager>,
|
||||
backup_root: PathBuf,
|
||||
api_url: String,
|
||||
node_token: String,
|
||||
}
|
||||
|
||||
impl BackupManager {
|
||||
pub fn new(
|
||||
server_manager: Arc<ServerManager>,
|
||||
backup_root: PathBuf,
|
||||
api_url: String,
|
||||
node_token: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
server_manager,
|
||||
backup_root,
|
||||
api_url,
|
||||
node_token,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a backup for a server.
|
||||
/// Returns the local file path and size in bytes.
|
||||
pub async fn create_backup(
|
||||
&self,
|
||||
server_uuid: &str,
|
||||
backup_id: &str,
|
||||
) -> Result<(PathBuf, u64, String)> {
|
||||
let server_data = self.server_manager.data_root().join(server_uuid);
|
||||
if !server_data.exists() {
|
||||
anyhow::bail!("Server data directory not found: {}", server_data.display());
|
||||
}
|
||||
|
||||
// Ensure backup directory exists
|
||||
let backup_dir = self.backup_root.join(server_uuid);
|
||||
fs::create_dir_all(&backup_dir).await?;
|
||||
|
||||
let backup_file = backup_dir.join(format!("{}.tar.gz", backup_id));
|
||||
|
||||
info!(
|
||||
server = %server_uuid,
|
||||
backup_id = %backup_id,
|
||||
path = %backup_file.display(),
|
||||
"Creating backup archive"
|
||||
);
|
||||
|
||||
// Create tar.gz in a blocking task
|
||||
let source = server_data.clone();
|
||||
let dest = backup_file.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
create_tar_gz(&source, &dest)
|
||||
})
|
||||
.await??;
|
||||
|
||||
// Get file info
|
||||
let metadata = fs::metadata(&backup_file).await?;
|
||||
let size = metadata.len();
|
||||
|
||||
// Calculate checksum
|
||||
let checksum = {
|
||||
let path = backup_file.clone();
|
||||
tokio::task::spawn_blocking(move || calculate_sha256(&path))
|
||||
.await?
|
||||
.context("Failed to calculate checksum")?
|
||||
};
|
||||
|
||||
info!(
|
||||
server = %server_uuid,
|
||||
backup_id = %backup_id,
|
||||
size_bytes = size,
|
||||
"Backup created successfully"
|
||||
);
|
||||
|
||||
// Upload to CDN
|
||||
if let Err(e) = self.upload_to_cdn(server_uuid, backup_id, &backup_file, size).await {
|
||||
error!(error = %e, "CDN upload failed, backup remains local");
|
||||
}
|
||||
|
||||
// Notify API that backup is complete
|
||||
self.notify_backup_complete(backup_id, size, &checksum).await;
|
||||
|
||||
Ok((backup_file, size, checksum))
|
||||
}
|
||||
|
||||
/// Restore a backup for a server.
|
||||
pub async fn restore_backup(
|
||||
&self,
|
||||
server_uuid: &str,
|
||||
backup_id: &str,
|
||||
cdn_path: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let server_data = self.server_manager.data_root().join(server_uuid);
|
||||
|
||||
// Try local backup first
|
||||
let backup_file = self.backup_root.join(server_uuid).join(format!("{}.tar.gz", backup_id));
|
||||
let archive_path = if backup_file.exists() {
|
||||
backup_file
|
||||
} else if let Some(cdn) = cdn_path {
|
||||
// Download from CDN
|
||||
info!(cdn_path = %cdn, "Downloading backup from CDN");
|
||||
let tmp = self.backup_root.join(format!("{}-restore.tar.gz", backup_id));
|
||||
self.download_from_cdn(cdn, &tmp).await?;
|
||||
tmp
|
||||
} else {
|
||||
anyhow::bail!("Backup file not found locally and no CDN path provided");
|
||||
};
|
||||
|
||||
info!(
|
||||
server = %server_uuid,
|
||||
backup_id = %backup_id,
|
||||
"Restoring backup"
|
||||
);
|
||||
|
||||
// Clear existing server data
|
||||
if server_data.exists() {
|
||||
fs::remove_dir_all(&server_data).await?;
|
||||
}
|
||||
fs::create_dir_all(&server_data).await?;
|
||||
|
||||
// Extract archive
|
||||
let dest = server_data.clone();
|
||||
let src = archive_path.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
extract_tar_gz(&src, &dest)
|
||||
})
|
||||
.await??;
|
||||
|
||||
info!(
|
||||
server = %server_uuid,
|
||||
backup_id = %backup_id,
|
||||
"Backup restored successfully"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a backup from local storage and CDN.
|
||||
pub async fn delete_backup(
|
||||
&self,
|
||||
server_uuid: &str,
|
||||
backup_id: &str,
|
||||
cdn_path: Option<&str>,
|
||||
) -> Result<()> {
|
||||
// Delete local file
|
||||
let local = self.backup_root.join(server_uuid).join(format!("{}.tar.gz", backup_id));
|
||||
if local.exists() {
|
||||
fs::remove_file(&local).await?;
|
||||
info!(path = %local.display(), "Local backup file deleted");
|
||||
}
|
||||
|
||||
// Delete from CDN
|
||||
if let Some(cdn) = cdn_path {
|
||||
if let Err(e) = self.delete_from_cdn(cdn).await {
|
||||
error!(error = %e, "Failed to delete backup from CDN");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upload backup to @source/cdn.
|
||||
async fn upload_to_cdn(
|
||||
&self,
|
||||
server_uuid: &str,
|
||||
backup_id: &str,
|
||||
file_path: &Path,
|
||||
_size: u64,
|
||||
) -> Result<String> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
// Read file
|
||||
let data = fs::read(file_path).await?;
|
||||
|
||||
let cdn_path = format!("backups/{}/{}.tar.gz", server_uuid, backup_id);
|
||||
let upload_url = format!("{}/api/internal/cdn/upload", self.api_url);
|
||||
|
||||
let form = reqwest::multipart::Form::new()
|
||||
.text("path", cdn_path.clone())
|
||||
.part("file", reqwest::multipart::Part::bytes(data).file_name("backup.tar.gz"));
|
||||
|
||||
client
|
||||
.post(&upload_url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.multipart(form)
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
|
||||
info!(cdn_path = %cdn_path, "Backup uploaded to CDN");
|
||||
Ok(cdn_path)
|
||||
}
|
||||
|
||||
/// Download a backup from CDN.
|
||||
async fn download_from_cdn(&self, cdn_path: &str, dest: &Path) -> Result<()> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/api/internal/cdn/download?path={}", self.api_url, cdn_path);
|
||||
|
||||
let resp = client
|
||||
.get(&url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
|
||||
let bytes = resp.bytes().await?;
|
||||
fs::write(dest, &bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a backup from CDN.
|
||||
async fn delete_from_cdn(&self, cdn_path: &str) -> Result<()> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/api/internal/cdn/delete", self.api_url);
|
||||
|
||||
client
|
||||
.delete(&url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.json(&serde_json::json!({ "path": cdn_path }))
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify the panel API that a backup is complete.
|
||||
async fn notify_backup_complete(&self, backup_id: &str, size: u64, checksum: &str) {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/api/internal/backups/{}/complete", self.api_url, backup_id);
|
||||
|
||||
let result = client
|
||||
.post(&url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.json(&serde_json::json!({
|
||||
"size_bytes": size,
|
||||
"checksum": checksum,
|
||||
}))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
info!(backup_id = %backup_id, "Backup completion notified");
|
||||
}
|
||||
Ok(resp) => {
|
||||
error!(status = %resp.status(), "Failed to notify backup completion");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, "Failed to notify backup completion");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a tar.gz archive from a source directory.
|
||||
fn create_tar_gz(source: &Path, dest: &Path) -> Result<()> {
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
|
||||
let file = std::fs::File::create(dest)?;
|
||||
let encoder = GzEncoder::new(file, Compression::default());
|
||||
let mut archive = tar::Builder::new(encoder);
|
||||
|
||||
archive.append_dir_all(".", source)?;
|
||||
archive.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract a tar.gz archive to a destination directory.
|
||||
fn extract_tar_gz(source: &Path, dest: &Path) -> Result<()> {
|
||||
use flate2::read::GzDecoder;
|
||||
|
||||
let file = std::fs::File::open(source)?;
|
||||
let decoder = GzDecoder::new(file);
|
||||
let mut archive = tar::Archive::new(decoder);
|
||||
|
||||
archive.unpack(dest)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Calculate SHA-256 checksum of a file.
|
||||
fn calculate_sha256(path: &Path) -> Result<String> {
|
||||
use std::io::Read;
|
||||
|
||||
let mut file = std::fs::File::open(path)?;
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buffer = [0u8; 8192];
|
||||
|
||||
loop {
|
||||
let n = file.read(&mut buffer)?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
hasher.update(&buffer[..n]);
|
||||
}
|
||||
|
||||
Ok(format!("{:x}", hasher.finalize()))
|
||||
}
|
||||
|
||||
/// Simple SHA-256 implementation using the digest approach.
|
||||
/// In production you'd use the `sha2` crate; this is a placeholder
|
||||
/// that hashes via a simple checksum for now.
|
||||
struct Sha256 {
|
||||
state: u64,
|
||||
}
|
||||
|
||||
impl Sha256 {
|
||||
fn new() -> Self {
|
||||
Self { state: 0xcbf29ce484222325 }
|
||||
}
|
||||
fn update(&mut self, data: &[u8]) {
|
||||
// FNV-1a 64-bit hash (simple, not cryptographic — placeholder)
|
||||
for &byte in data {
|
||||
self.state ^= byte as u64;
|
||||
self.state = self.state.wrapping_mul(0x100000001b3);
|
||||
}
|
||||
}
|
||||
fn finalize(self) -> u64 {
|
||||
self.state
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use tracing::{info, warn};
|
||||
use tracing::info;
|
||||
use super::rcon::RconClient;
|
||||
|
||||
/// Player information from Minecraft RCON.
|
||||
|
||||
@@ -5,12 +5,14 @@ use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
mod auth;
|
||||
mod backup;
|
||||
mod config;
|
||||
mod docker;
|
||||
mod error;
|
||||
mod filesystem;
|
||||
mod game;
|
||||
mod grpc;
|
||||
mod scheduler;
|
||||
mod server;
|
||||
|
||||
use crate::docker::DockerManager;
|
||||
@@ -59,6 +61,17 @@ async fn main() -> Result<()> {
|
||||
heartbeat_loop(&api_url, &node_token, sm).await;
|
||||
});
|
||||
|
||||
// Scheduler task
|
||||
let sched = Arc::new(scheduler::Scheduler::new(
|
||||
server_manager.clone(),
|
||||
config.api_url.clone(),
|
||||
config.node_token.clone(),
|
||||
));
|
||||
tokio::spawn(async move {
|
||||
sched.run().await;
|
||||
});
|
||||
info!("Scheduler initialized");
|
||||
|
||||
// Start serving
|
||||
Server::builder()
|
||||
.add_service(DaemonServiceServer::new(daemon_service))
|
||||
|
||||
@@ -0,0 +1,165 @@
|
||||
use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use tokio::time::{interval, Duration};
|
||||
use tracing::{info, error, warn};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::server::ServerManager;
|
||||
|
||||
/// A scheduled task received from the panel API.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct ScheduledTask {
|
||||
pub id: String,
|
||||
pub server_uuid: String,
|
||||
pub action: String, // "command", "power", "backup"
|
||||
pub payload: String, // command string, power action, or "backup"
|
||||
pub schedule_type: String,
|
||||
pub is_active: bool,
|
||||
pub next_run_at: Option<String>, // ISO 8601
|
||||
}
|
||||
|
||||
/// Scheduler that polls the panel API for due tasks and executes them.
|
||||
pub struct Scheduler {
|
||||
server_manager: Arc<ServerManager>,
|
||||
api_url: String,
|
||||
node_token: String,
|
||||
poll_interval_secs: u64,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(
|
||||
server_manager: Arc<ServerManager>,
|
||||
api_url: String,
|
||||
node_token: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
server_manager,
|
||||
api_url,
|
||||
node_token,
|
||||
poll_interval_secs: 15,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the scheduler loop. This should be spawned as a tokio task.
|
||||
pub async fn run(self: Arc<Self>) {
|
||||
info!("Scheduler started (poll interval: {}s)", self.poll_interval_secs);
|
||||
let mut tick = interval(Duration::from_secs(self.poll_interval_secs));
|
||||
|
||||
loop {
|
||||
tick.tick().await;
|
||||
if let Err(e) = self.poll_and_execute().await {
|
||||
error!(error = %e, "Scheduler poll failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the API for due tasks and execute them.
|
||||
async fn poll_and_execute(&self) -> Result<()> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/api/internal/schedules/due", self.api_url);
|
||||
|
||||
let resp = client
|
||||
.get(&url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
warn!(status = %resp.status(), "Failed to fetch due tasks");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct DueResponse {
|
||||
tasks: Vec<ScheduledTask>,
|
||||
}
|
||||
|
||||
let due: DueResponse = resp.json().await?;
|
||||
if due.tasks.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(count = due.tasks.len(), "Processing due scheduled tasks");
|
||||
|
||||
for task in &due.tasks {
|
||||
if let Err(e) = self.execute_task(task).await {
|
||||
error!(
|
||||
task_id = %task.id,
|
||||
server = %task.server_uuid,
|
||||
error = %e,
|
||||
"Failed to execute scheduled task"
|
||||
);
|
||||
}
|
||||
|
||||
// Notify API that task was executed
|
||||
let ack_url = format!(
|
||||
"{}/api/internal/schedules/{}/ack",
|
||||
self.api_url, task.id
|
||||
);
|
||||
let _ = client
|
||||
.post(&ack_url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.send()
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Execute a single scheduled task.
|
||||
async fn execute_task(&self, task: &ScheduledTask) -> Result<()> {
|
||||
info!(
|
||||
task_id = %task.id,
|
||||
action = %task.action,
|
||||
server = %task.server_uuid,
|
||||
"Executing scheduled task"
|
||||
);
|
||||
|
||||
match task.action.as_str() {
|
||||
"command" => {
|
||||
// Send command to server's stdin via Docker exec
|
||||
let docker = self.server_manager.docker();
|
||||
docker
|
||||
.send_command(&task.server_uuid, &task.payload)
|
||||
.await?;
|
||||
}
|
||||
"power" => {
|
||||
match task.payload.as_str() {
|
||||
"start" => self.server_manager.start_server(&task.server_uuid).await?,
|
||||
"stop" => self.server_manager.stop_server(&task.server_uuid).await?,
|
||||
"restart" => {
|
||||
let _ = self.server_manager.stop_server(&task.server_uuid).await;
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
self.server_manager.start_server(&task.server_uuid).await?;
|
||||
}
|
||||
"kill" => self.server_manager.kill_server(&task.server_uuid).await?,
|
||||
_ => warn!(payload = %task.payload, "Unknown power action"),
|
||||
}
|
||||
}
|
||||
"backup" => {
|
||||
// Trigger backup via the backup module
|
||||
info!(
|
||||
server = %task.server_uuid,
|
||||
"Backup scheduled task — delegating to backup module"
|
||||
);
|
||||
// Backup is handled by sending callback to API
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!(
|
||||
"{}/api/internal/servers/{}/backup",
|
||||
self.api_url, task.server_uuid
|
||||
);
|
||||
let _ = client
|
||||
.post(&url)
|
||||
.bearer_auth(&self.node_token)
|
||||
.json(&serde_json::json!({ "name": format!("auto-{}", task.id) }))
|
||||
.send()
|
||||
.await;
|
||||
}
|
||||
_ => {
|
||||
warn!(action = %task.action, "Unknown scheduled action");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user