Skip to content

Commit

Permalink
Hydra console api, process/runtime debug apis. (#7)
Browse files Browse the repository at this point in the history
* Start console server actor, used for remote console api.

* Prevent console server from being constructed.

* Adds Process::info, more console server remote debug api.

* Clean up info call.

* Add an console api call to get runtime information.

* Log console server shutdown reason.

* Make runtime info public.

* Add method to connect to a console server remotely.

* Update CHANGELOG.md
  • Loading branch information
dtzxporter authored Jun 11, 2024
1 parent 080a776 commit e34e4bf
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Unreleased

### Added
- Process::info to get debug information on a specific local process.
- ConsoleServer, enabled using the `console` feature which enables a remote debug api for gathering debug information in realtime.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde = { version = "1.0", default-features = false, features = ["derive", "std"
rmp-serde = "1.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "net", "sync", "time", "macros", "signal"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] }
tokio-tungstenite = { version = "0.21", default-features = false, features = ["handshake"] }
tokio-tungstenite = { version = "0.23", default-features = false, features = ["handshake"] }
tokio-native-tls = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion hydra-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
publish = false

[dependencies]
hydra = { path = "../hydra", default-features = false, features = ["tracing"] }
hydra = { path = "../hydra", default-features = false, features = ["tracing", "console"] }
tracing = "0.1"

serde.workspace = true
1 change: 1 addition & 0 deletions hydra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description.workspace = true
default = ["macros", "tracing"]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
macros = ["dep:hydra-macros"]
console = []

[dependencies]
flume.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions hydra/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::Process;
use crate::ProcessFlags;
use crate::SystemMessage;

#[cfg(feature = "console")]
use crate::ConsoleServer;

/// Messages used internally by [Application].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
enum ApplicationMessage {
Expand Down Expand Up @@ -73,6 +76,12 @@ pub trait Application: Sized + Send + 'static {
Process::spawn(async move {
Process::set_flags(ProcessFlags::TRAP_EXIT);

#[cfg(feature="console")]
let mut cpid = ConsoleServer::new()
.start_link()
.await
.expect("Failed to start console server!");

match self.start().await {
Ok(pid) => {
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -111,6 +120,14 @@ pub trait Application: Sized + Send + 'static {
Process::exit(pid, ExitReason::from("shutdown"));
Process::send_after(Process::current(), ShutdownTimeout, config.graceful_shutdown_timeout);
}

#[cfg(feature = "console")]
if cpid == epid && ereason != "shutdown" {
cpid = ConsoleServer::new()
.start_link()
.await
.expect("Failed to restart console server!");
}
}
_ => continue,
}
Expand Down
148 changes: 148 additions & 0 deletions hydra/src/console.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use serde::Deserialize;
use serde::Serialize;

use crate::CallError;
use crate::Dest;
use crate::ExitReason;
use crate::From;
use crate::GenServer;
use crate::GenServerOptions;
use crate::Node;
use crate::NodeState;
use crate::Pid;
use crate::Process;
use crate::ProcessInfo;
use crate::RuntimeInfo;

/// Unique registered name for the console server process.
const CONSOLE_NAME: &str = "$hydra_console";

/// Message used by the console server.
#[derive(Serialize, Deserialize)]
pub enum ConsoleServerMessage {
Connect,
ConnectSuccess(Pid),
ListNodes(NodeState),
ListNodesSuccess(Vec<Node>),
ListProcesses,
ListProcessesSuccess(Vec<Pid>),
ProcessesInfo(Vec<Pid>),
ProcessesInfoSuccess(Vec<Option<ProcessInfo>>),
ListRuntimeInfo,
ListRuntimeInfoSuccess(RuntimeInfo),
}

/// Console acts as a relay to `hydra-console`. It collects and sends realtime information about the current hydra instance.
pub struct ConsoleServer {
/// Used to prevent anyone from just constructing the console server, it's handled by applications.
#[allow(unused)]
_ignore: bool,
}

impl ConsoleServer {
/// Constructs a new instance of [ConsoleServer].
pub(super) const fn new() -> Self {
Self { _ignore: true }
}

/// Starts a [ConsoleServer] process linked to the current process.
pub(super) async fn start_link(self) -> Result<Pid, ExitReason> {
GenServer::start_link(self, GenServerOptions::new().name(CONSOLE_NAME)).await
}

/// Connects to the [ConsoleServer] on the given node, returning it's [Pid] if successful.
pub async fn connect<T: Into<Node>>(node: T) -> Result<Pid, CallError> {
use ConsoleServerMessage::*;

match ConsoleServer::call((CONSOLE_NAME, node), Connect, None).await? {
ConnectSuccess(pid) => Ok(pid),
_ => unreachable!(),
}
}

/// Requests the connected node list based on the state.
pub async fn list_nodes<T: Into<Dest>>(
server: T,
state: NodeState,
) -> Result<Vec<Node>, CallError> {
use ConsoleServerMessage::*;

match ConsoleServer::call(server.into(), ListNodes(state), None).await? {
ListNodesSuccess(nodes) => Ok(nodes),
_ => unreachable!(),
}
}

/// Requests the list of running processes.
pub async fn list_processes<T: Into<Dest>>(server: T) -> Result<Vec<Pid>, CallError> {
use ConsoleServerMessage::*;

match ConsoleServer::call(server.into(), ListProcesses, None).await? {
ListProcessesSuccess(processes) => Ok(processes),
_ => unreachable!(),
}
}

/// Requests process info for the given processes.
pub async fn processes_info<T: Into<Dest>>(
server: T,
processes: Vec<Pid>,
) -> Result<Vec<Option<ProcessInfo>>, CallError> {
use ConsoleServerMessage::*;

match ConsoleServer::call(server.into(), ProcessesInfo(processes), None).await? {
ProcessesInfoSuccess(info) => Ok(info),
_ => unreachable!(),
}
}

/// Requests runtime info for the hydra instance.
pub async fn runtime_info<T: Into<Dest>>(server: T) -> Result<RuntimeInfo, CallError> {
use ConsoleServerMessage::*;

match ConsoleServer::call(server.into(), ListRuntimeInfo, None).await? {
ListRuntimeInfoSuccess(info) => Ok(info),
_ => unreachable!(),
}
}
}

impl GenServer for ConsoleServer {
type Message = ConsoleServerMessage;

async fn init(&mut self) -> Result<(), ExitReason> {
#[cfg(feature = "tracing")]
tracing::info!(console = ?Process::current(), "Console server has started");

Ok(())
}

async fn terminate(&mut self, reason: ExitReason) {
#[cfg(feature = "tracing")]
tracing::info!(console = ?Process::current(), reason = ?reason, "Console server has terminated");

#[cfg(not(feature = "tracing"))]
let _ = reason;
}

async fn handle_call(
&mut self,
message: Self::Message,
_from: From,
) -> Result<Option<Self::Message>, ExitReason> {
use ConsoleServerMessage::*;

match message {
Connect => Ok(Some(ConnectSuccess(Process::current()))),
ListNodes(state) => Ok(Some(ListNodesSuccess(Node::list_by_state(state)))),
ListProcesses => Ok(Some(ListProcessesSuccess(Process::list()))),
ProcessesInfo(pids) => {
let process_info = pids.into_iter().map(Process::info).collect();

Ok(Some(ProcessesInfoSuccess(process_info)))
}
ListRuntimeInfo => Ok(Some(ListRuntimeInfoSuccess(RuntimeInfo::load()))),
_ => unreachable!(),
}
}
}
12 changes: 12 additions & 0 deletions hydra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod node_state;
mod pid;
mod process;
mod process_flags;
mod process_info;
mod process_item;
mod process_kernel;
mod process_monitor;
Expand All @@ -47,6 +48,11 @@ mod system_message;
mod task;
mod timeout;

#[cfg(feature = "console")]
mod console;
#[cfg(feature = "console")]
mod runtime_info;

pub use application::*;
pub use application_config::*;
pub use argument_error::*;
Expand All @@ -66,6 +72,7 @@ pub use node_state::*;
pub use pid::*;
pub use process::*;
pub use process_flags::*;
pub use process_info::*;
pub use process_receiver::*;
pub use receivable::*;
pub use reference::*;
Expand All @@ -85,6 +92,11 @@ pub use hydra_macros::main;
#[cfg(feature = "macros")]
pub use hydra_macros::test;

#[cfg(feature = "console")]
pub use console::*;
#[cfg(feature = "console")]
pub use runtime_info::*;

pub(crate) use alias::*;
pub(crate) use catch_unwind::*;
pub(crate) use link::*;
Expand Down
10 changes: 10 additions & 0 deletions hydra/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::process_exit_signal_linked;
use crate::ExitReason;
use crate::Node;
use crate::Pid;
use crate::ProcessInfo;

/// A collection of local processes linked to another process.
static LINKS: Lazy<DashMap<u64, BTreeSet<Pid>>> = Lazy::new(DashMap::new);
Expand Down Expand Up @@ -121,3 +122,12 @@ pub fn link_process_down(from: Pid, exit_reason: ExitReason) {
node_send_frame(link_down.into(), node);
}
}

/// Fills in link information for a process.
pub fn link_fill_info(pid: Pid, info: &mut ProcessInfo) {
let Some(links) = LINKS.get(&pid.id()) else {
return;
};

info.links = links.value().iter().copied().collect();
}
10 changes: 10 additions & 0 deletions hydra/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::Dest;
use crate::ExitReason;
use crate::Node;
use crate::Pid;
use crate::ProcessInfo;
use crate::ProcessItem;
use crate::ProcessMonitor;
use crate::Reference;
Expand Down Expand Up @@ -249,3 +250,12 @@ pub fn monitor_process_down(from: Pid, exit_reason: ExitReason) {
node_send_frame(monitor_down.into(), node);
}
}

/// Fills in monitor information for the process.
pub fn monitor_fill_info(pid: Pid, info: &mut ProcessInfo) {
let Some(monitors) = MONITORS.get(&pid.id()) else {
return;
};

info.monitored_by = monitors.value().values().map(|entry| entry.0).collect();
}
5 changes: 4 additions & 1 deletion hydra/src/node_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use serde::Deserialize;
use serde::Serialize;

/// The different states a node can be in.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub enum NodeState {
/// This node is the local node.
Current,
Expand Down
22 changes: 22 additions & 0 deletions hydra/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use crate::alias_destroy;
use crate::alias_destroy_all;
use crate::link_create;
use crate::link_destroy;
use crate::link_fill_info;
use crate::link_install;
use crate::link_process_down;
use crate::monitor_create;
use crate::monitor_destroy;
use crate::monitor_destroy_all;
use crate::monitor_fill_info;
use crate::monitor_install;
use crate::monitor_process_down;
use crate::node_process_send_exit;
Expand All @@ -28,6 +30,7 @@ use crate::process_destroy_timer;
use crate::process_drop;
use crate::process_exit;
use crate::process_flags;
use crate::process_info;
use crate::process_insert;
use crate::process_list;
use crate::process_name_list;
Expand All @@ -48,6 +51,7 @@ use crate::ExitReason;
use crate::Message;
use crate::Pid;
use crate::ProcessFlags;
use crate::ProcessInfo;
use crate::ProcessItem;
use crate::ProcessMonitor;
use crate::ProcessReceiver;
Expand Down Expand Up @@ -390,6 +394,24 @@ impl Process {
node_process_send_exit(pid, Self::current(), exit_reason);
}
}

/// Fetches debug information for a given local process.
#[must_use]
pub fn info(pid: Pid) -> Option<ProcessInfo> {
if pid.is_remote() {
panic!("Can't query information on a remote process!");
}

let info = process_info(pid);

info.map(|mut info| {
link_fill_info(pid, &mut info);

monitor_fill_info(pid, &mut info);

info
})
}
}

impl Drop for Process {
Expand Down
Loading

0 comments on commit e34e4bf

Please sign in to comment.