diff --git a/msmonitor/dynolog_npu/CMakeLists.txt b/msmonitor/dynolog_npu/CMakeLists.txt index d0da4c68f37a0b22bad7c873e8c17c66c0c03d5b..7b5e70bb375e0fd232f3b246bf9f8d350f069635 100644 --- a/msmonitor/dynolog_npu/CMakeLists.txt +++ b/msmonitor/dynolog_npu/CMakeLists.txt @@ -78,9 +78,11 @@ target_link_libraries(dynolog_lib PUBLIC pfs) add_subdirectory(third_party/fmt) target_link_libraries(dynolog_lib PUBLIC fmt::fmt) -add_subdirectory(third_party/tensorboard_logger) -target_include_directories(dynolog_lib PUBLIC third_party/tensorboard_logger/include) -target_link_libraries(dynolog_lib PUBLIC tensorboard_logger) +if(USE_PROMETHEUS) + add_subdirectory(third_party/tensorboard_logger) + target_include_directories(dynolog_lib PUBLIC third_party/tensorboard_logger/include) + target_link_libraries(dynolog_lib PUBLIC tensorboard_logger) +endif() if(USE_ODS_GRAPH_API) add_subdirectory(third_party/cpr) diff --git a/msmonitor/dynolog_npu/cli/Cargo.toml b/msmonitor/dynolog_npu/cli/Cargo.toml index 7d87551ba4f2e9dc3b6710ab44964365e41910e3..87de1502af24cb6c7ef9d59bfc6a05c83d7a1456 100644 --- a/msmonitor/dynolog_npu/cli/Cargo.toml +++ b/msmonitor/dynolog_npu/cli/Cargo.toml @@ -7,9 +7,6 @@ edition = "2021" anyhow = "1.0.57" clap = { version = "3.1.0", features = ["derive"]} serde_json = "1.0" -rustls = "0.21.0" -rustls-pemfile = "1.0" -webpki = "0.22" [net] git-fetch-with-cli = true diff --git a/msmonitor/dynolog_npu/cli/src/commands/dcgm.rs b/msmonitor/dynolog_npu/cli/src/commands/dcgm.rs deleted file mode 100644 index a5261fc8acefe0199340b9d7ca77903a533ee3d7..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/cli/src/commands/dcgm.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -use std::net::TcpStream; -use rustls::{ClientConnection, StreamOwned}; - -use anyhow::Result; - -#[path = "utils.rs"] -mod utils; - -// This module contains the handling logic for dcgm - -/// Pause dcgm module profiling -pub fn run_dcgm_pause( - mut client: StreamOwned, - duration_s: i32, -) -> Result<()> { - let request_json = format!( - r#" -{{ - "fn": "dcgmProfPause", - "duration_s": {} -}}"#, - duration_s - ); - - utils::send_msg(&mut client, &request_json).expect("Error sending message to service"); - - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); - - println!("response = {}", resp_str); - - Ok(()) -} - -/// Resume dcgm module profiling -pub fn run_dcgm_resume(mut client: StreamOwned) -> Result<()> { - utils::send_msg(&mut client, r#"{"fn":"dcgmProfResume"}"#) - .expect("Error sending message to service"); - - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); - - println!("response = {}", resp_str); - - Ok(()) -} \ No newline at end of file diff --git a/msmonitor/dynolog_npu/cli/src/commands/gputrace.rs b/msmonitor/dynolog_npu/cli/src/commands/gputrace.rs deleted file mode 100644 index c27b7534e06a8ed8569a44dadaaf2654da093589..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/cli/src/commands/gputrace.rs +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -use std::net::TcpStream; -use rustls::{ClientConnection, StreamOwned}; - -use anyhow::Result; -use serde_json::Value; - -#[path = "utils.rs"] -mod utils; - -// This module contains the handling logic for dyno gputrace - -#[derive(Debug)] -pub enum GpuTraceTriggerConfig { - DurationBased { - profile_start_time: u64, - duration_ms: u64, - }, - IterationBased { - profile_start_iteration_roundup: u64, - iterations: i64, - }, -} - -impl GpuTraceTriggerConfig { - fn config(&self) -> String { - match *self { - GpuTraceTriggerConfig::DurationBased { - profile_start_time, - duration_ms, - } => format!( - "PROFILE_START_TIME={}\nACTIVITIES_DURATION_MSECS={}", - profile_start_time, duration_ms - ), - GpuTraceTriggerConfig::IterationBased { - profile_start_iteration_roundup, - iterations, - } => format!( - r#"PROFILE_START_ITERATION=0 -PROFILE_START_ITERATION_ROUNDUP={} -ACTIVITIES_ITERATIONS={}"#, - profile_start_iteration_roundup, iterations - ), - } - } -} - -#[derive(Debug)] -pub struct GpuTraceOptions { - pub record_shapes: bool, - pub profile_memory: bool, - pub with_stacks: bool, - pub with_flops: bool, - pub with_modules: bool, -} - -impl GpuTraceOptions { - fn config(&self) -> String { - format!( - r#" -PROFILE_REPORT_INPUT_SHAPES={} -PROFILE_PROFILE_MEMORY={} -PROFILE_WITH_STACK={} -PROFILE_WITH_FLOPS={} -PROFILE_WITH_MODULES={}"#, - self.record_shapes, - self.profile_memory, - self.with_stacks, - self.with_flops, - self.with_modules - ) - } -} - -#[derive(Debug)] -pub struct GpuTraceConfig { - pub log_file: String, - pub trigger_config: GpuTraceTriggerConfig, - pub trace_options: GpuTraceOptions, -} - -impl GpuTraceConfig { - fn config(&self) -> String { - format!( - "ACTIVITIES_LOG_FILE={}\n{}{}", - self.log_file, - self.trigger_config.config(), - self.trace_options.config() - ) - } -} - -/// Gputrace command triggers GPU profiling on pytorch apps -pub fn run_gputrace( - mut client: StreamOwned, - job_id: u64, - pids: &str, - process_limit: u32, - config: GpuTraceConfig, -) -> Result<()> { - let kineto_config = config.config(); - println!("Kineto config = \n{}", kineto_config); - let kineto_config = kineto_config.replace('\n', "\\n"); - - let request_json = format!( - r#" -{{ - "fn": "setKinetOnDemandRequest", - "config": "{}", - "job_id": {}, - "pids": [{}], - "process_limit": {} -}}"#, - kineto_config, job_id, pids, process_limit - ); - - utils::send_msg(&mut client, &request_json).expect("Error sending message to service"); - - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); - - println!("response = {}", resp_str); - - let resp_v: Value = serde_json::from_str(&resp_str)?; - let processes = resp_v["processesMatched"].as_array().unwrap(); - - if processes.is_empty() { - println!("No processes were matched, please check --job-id or --pids flags"); - } else { - println!("Matched {} processes", processes.len()); - println!("Trace output files will be written to:"); - - for pid in processes { - let pid = pid.as_i64().unwrap(); - println!( - " {}", - config.log_file.replace(".json", &format!("_{}.json", pid)) - ); - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use crate::*; - - #[test] - fn test_gputrace_trigger_config() { - let trigger_config = GpuTraceTriggerConfig::DurationBased { - profile_start_time: 1000, - duration_ms: 42, - }; - assert_eq!( - trigger_config.config(), - r#"PROFILE_START_TIME=1000 -ACTIVITIES_DURATION_MSECS=42"# - ); - - let trigger_config = GpuTraceTriggerConfig::IterationBased { - profile_start_iteration_roundup: 1000, - iterations: 42, - }; - assert_eq!( - trigger_config.config(), - r#"PROFILE_START_ITERATION=0 -PROFILE_START_ITERATION_ROUNDUP=1000 -ACTIVITIES_ITERATIONS=42"# - ); - } - - #[test] - fn test_gputrace_config() { - let mut test_trace_options = GpuTraceOptions { - record_shapes: true, - profile_memory: false, - with_stacks: true, - with_flops: false, - with_modules: true, - }; - assert_eq!( - test_trace_options.config(), - r#" -PROFILE_REPORT_INPUT_SHAPES=true -PROFILE_PROFILE_MEMORY=false -PROFILE_WITH_STACK=true -PROFILE_WITH_FLOPS=false -PROFILE_WITH_MODULES=true"# - ); - - test_trace_options.profile_memory = true; - - let test_trace_config = GpuTraceConfig { - log_file: String::from("/tmp/test_trace.json"), - trigger_config: GpuTraceTriggerConfig::DurationBased { - profile_start_time: 1000, - duration_ms: 42, - }, - trace_options: test_trace_options, - }; - assert_eq!( - test_trace_config.config(), - r#"ACTIVITIES_LOG_FILE=/tmp/test_trace.json -PROFILE_START_TIME=1000 -ACTIVITIES_DURATION_MSECS=42 -PROFILE_REPORT_INPUT_SHAPES=true -PROFILE_PROFILE_MEMORY=true -PROFILE_WITH_STACK=true -PROFILE_WITH_FLOPS=false -PROFILE_WITH_MODULES=true"# - ); - } -} \ No newline at end of file diff --git a/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs b/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs index f8f73c5b959af37973552286426d6a20edea650f..1edfaea5939f5cee5df8618720d1bfa16d0071b5 100644 --- a/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs +++ b/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs @@ -1,4 +1,3 @@ -use rustls::{ClientConnection, StreamOwned}; use std::net::TcpStream; use anyhow::Result; @@ -31,7 +30,7 @@ MSPTI_ACTIVITY_KIND={}"#, } pub fn run_npumonitor( - mut client: StreamOwned, + client: TcpStream, config: NpuMonitorConfig, ) -> Result<()> { let config_str = config.config(); @@ -50,9 +49,9 @@ pub fn run_npumonitor( config_str ); - utils::send_msg(&mut client, &request_json).expect("Error sending message to service"); + utils::send_msg(&client, &request_json).expect("Error sending message to service"); - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); + let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes"); println!("response = {}", resp_str); diff --git a/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs b/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs index f7b14f9b11ff4ee219c75de90c581cf6135590cf..d1e9b45f34bc5df9001fd0e68fe52ea3d15704ab 100644 --- a/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs +++ b/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs @@ -1,5 +1,4 @@ use std::net::TcpStream; -use rustls::{ClientConnection, StreamOwned}; use anyhow::Result; use serde_json::Value; @@ -134,7 +133,7 @@ impl NpuTraceConfig { } pub fn run_nputrace( - mut client: StreamOwned, + client: TcpStream, job_id: u64, pids: &str, process_limit: u32, @@ -156,9 +155,9 @@ pub fn run_nputrace( config_str, job_id, pids, process_limit ); - utils::send_msg(&mut client, &request_json).expect("Error sending message to service"); + utils::send_msg(&client, &request_json).expect("Error sending message to service"); - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); + let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes"); println!("response = {}", resp_str); diff --git a/msmonitor/dynolog_npu/cli/src/commands/status.rs b/msmonitor/dynolog_npu/cli/src/commands/status.rs deleted file mode 100644 index 46a56b6c64582c1b710d7cf0d8beba0c87728525..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/cli/src/commands/status.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -use rustls::{ClientConnection, StreamOwned}; -use std::net::TcpStream; - -use anyhow::Result; - -#[path = "utils.rs"] -mod utils; - -// This module contains the handling logic for dyno status - -/// Get system info -pub fn run_status(mut client: StreamOwned) -> Result<()> { - utils::send_msg(&mut client, r#"{"fn":"getStatus"}"#).expect("Error sending message to service"); - - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); - - println!("response = {}", resp_str); - - Ok(()) -} \ No newline at end of file diff --git a/msmonitor/dynolog_npu/cli/src/commands/utils.rs b/msmonitor/dynolog_npu/cli/src/commands/utils.rs deleted file mode 100644 index ab78ec1a8ab35f75076715766a02ffb39a7682d9..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/cli/src/commands/utils.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -use std::io::{Read, Write}; - -use anyhow::Result; - -pub fn send_msg(client: &mut T, msg: &str) -> Result<()> { - let msg_len: [u8; 4] = i32::try_from(msg.len()).unwrap().to_ne_bytes(); - - client.write_all(&msg_len)?; - client.write_all(msg.as_bytes()).map_err(|err| err.into()) -} - -pub fn get_resp(client: &mut T) -> Result { - // Response is prefixed with length - let mut resp_len: [u8; 4] = [0; 4]; - client.read_exact(&mut resp_len)?; - - let resp_len = i32::from_ne_bytes(resp_len); - let resp_len = usize::try_from(resp_len).unwrap(); - - println!("response length = {}", resp_len); - - let mut resp_str = Vec::::new(); - resp_str.resize(resp_len, 0); - - client.read_exact(resp_str.as_mut_slice())?; - - String::from_utf8(resp_str).map_err(|err| err.into()) -} \ No newline at end of file diff --git a/msmonitor/dynolog_npu/cli/src/commands/version.rs b/msmonitor/dynolog_npu/cli/src/commands/version.rs deleted file mode 100644 index 5a29a85aaad3a7affe508e4c400de1b4e16beee0..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/cli/src/commands/version.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -use rustls::{ClientConnection, StreamOwned}; -use std::net::TcpStream; -use anyhow::Result; - -#[path = "utils.rs"] -mod utils; - -// This module contains the handling logic for querying dyno version - -/// Get version info -pub fn run_version(mut client: StreamOwned) -> Result<()> { - utils::send_msg(&mut client, r#"{"fn":"getVersion"}"#).expect("Error sending message to service"); - - let resp_str = utils::get_resp(&mut client).expect("Unable to decode output bytes"); - - println!("response = {}", resp_str); - - Ok(()) -} \ No newline at end of file diff --git a/msmonitor/dynolog_npu/cli/src/main.rs b/msmonitor/dynolog_npu/cli/src/main.rs index a71150a5af5c6db09e15faeb73e1ec6dc8ab4b82..4fa7f07146231046c232374fae2a8171cabf3e69 100644 --- a/msmonitor/dynolog_npu/cli/src/main.rs +++ b/msmonitor/dynolog_npu/cli/src/main.rs @@ -2,14 +2,9 @@ // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. -use std::fs::File; -use std::io::BufReader; -use rustls::{Certificate, RootCertStore, PrivateKey, ClientConnection, StreamOwned}; -use std::sync::Arc; + use std::net::TcpStream; use std::net::ToSocketAddrs; -use std::path::PathBuf; -use std::io; use anyhow::Result; use clap::Parser; @@ -49,8 +44,6 @@ struct Opts { hostname: String, #[clap(long, default_value_t = DYNO_PORT)] port: u16, - #[clap(long, required = true)] - certs_dir: String, #[clap(subcommand)] cmd: Command, } @@ -253,98 +246,28 @@ enum Command { DcgmResume, } -struct ClientConfigPath { - cert_path: PathBuf, - key_path: PathBuf, - ca_cert_path: PathBuf, -} - -fn create_dyno_client( - host: &str, - port: u16, - config: &ClientConfigPath -) -> Result> { +/// Create a socket connection to dynolog +fn create_dyno_client(host: &str, port: u16) -> Result { let addr = (host, port) .to_socket_addrs()? .next() - .ok_or_else(|| io::Error::new( - io::ErrorKind::NotFound, - "Could not resolve the host address" - ))?; - - let stream = TcpStream::connect(addr)?; - - println!("Loading CA cert from: {}", config.ca_cert_path.display()); - let mut root_store = RootCertStore::empty(); - let ca_file = File::open(&config.ca_cert_path)?; - let mut ca_reader = BufReader::new(ca_file); - let ca_certs = rustls_pemfile::certs(&mut ca_reader)?; - for ca_cert in ca_certs { - root_store.add(&Certificate(ca_cert))?; - } - - println!("Loading client cert from: {}", config.cert_path.display()); - let cert_file = File::open(&config.cert_path)?; - let mut cert_reader = BufReader::new(cert_file); - let certs = rustls_pemfile::certs(&mut cert_reader)? - .into_iter() - .map(Certificate) - .collect(); - - println!("Loading client key from: {}", config.key_path.display()); - let key_file = File::open(&config.key_path)?; - let mut key_reader = BufReader::new(key_file); - let keys = rustls_pemfile::pkcs8_private_keys(&mut key_reader)?; - if keys.is_empty() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "No private key found in the key file" - ).into()); - } - let key = PrivateKey(keys[0].clone()); - - let config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_client_auth_cert(certs, key)?; - - let server_name = rustls::ServerName::try_from(host) - .map_err(|e| io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid hostname: {}", e) - ))?; - - let conn = rustls::ClientConnection::new( - Arc::new(config), - server_name - )?; - - // 返回 TLS stream - Ok(StreamOwned::new(conn, stream)) + .expect("Failed to connect to the server"); + TcpStream::connect(addr).map_err(|err| err.into()) } fn main() -> Result<()> { let Opts { hostname, port, - certs_dir, cmd, } = Opts::parse(); - let certs_dir = PathBuf::from(&certs_dir); - - let config = ClientConfigPath { - cert_path: certs_dir.join("client.crt"), - key_path: certs_dir.join("client.key"), - ca_cert_path: certs_dir.join("ca.crt"), - }; - - let client = create_dyno_client(&hostname, port, &config) - .expect("Couldn't connect to the server..."); + let dyno_client = + create_dyno_client(&hostname, port).expect("Couldn't connect to the server..."); match cmd { - Command::Status => status::run_status(client), - Command::Version => version::run_version(client), + Command::Status => status::run_status(dyno_client), + Command::Version => version::run_version(dyno_client), Command::Gputrace { job_id, pids, @@ -383,7 +306,7 @@ fn main() -> Result<()> { trigger_config, trace_options, }; - gputrace::run_gputrace(client, job_id, &pids, process_limit, trace_config) + gputrace::run_gputrace(dyno_client, job_id, &pids, process_limit, trace_config) } Command::Nputrace { job_id, @@ -454,7 +377,7 @@ fn main() -> Result<()> { trigger_config, trace_options, }; - nputrace::run_nputrace(client, job_id, &pids, process_limit, trace_config) + nputrace::run_nputrace(dyno_client, job_id, &pids, process_limit, trace_config) } Command::NpuMonitor { npu_monitor_start, @@ -468,10 +391,10 @@ fn main() -> Result<()> { report_interval_s, mspti_activity_kind }; - npumonitor::run_npumonitor(client, npu_mon_config) + npumonitor::run_npumonitor(dyno_client, npu_mon_config) } - Command::DcgmPause { duration_s } => dcgm::run_dcgm_pause(client, duration_s), - Command::DcgmResume => dcgm::run_dcgm_resume(client), + Command::DcgmPause { duration_s } => dcgm::run_dcgm_pause(dyno_client, duration_s), + Command::DcgmResume => dcgm::run_dcgm_resume(dyno_client), // ... add new commands here } } \ No newline at end of file diff --git a/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.cpp b/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.cpp index 9b15a2a8a12f46aba85807d8a6ed740a52403d9f..b5f282e3133976b80f3046b5aef765268eb79764 100644 --- a/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.cpp @@ -11,6 +11,7 @@ #include #include +#ifdef USE_PROMETHEUS DEFINE_string(metric_log_dir, "", "The Path to store tensorboard logs"); namespace dynolog { @@ -136,4 +137,5 @@ void TensorBoardLoggerImpl::log(const std::string &key, double val, uint64_t ste } logger_->add_scalar(key, step, val); } -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.h b/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.h index 4b174dbaf98b2e75eed5555ac3fa62500134a471..e1ed949e0df64450fade529d05033f16c6d76f2e 100644 --- a/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.h +++ b/msmonitor/dynolog_npu/dynolog/src/DynologTensorBoardLogger.h @@ -9,6 +9,8 @@ #include "MsMonitorMetrics.h" +#ifdef USE_PROMETHEUS + #include "tensorboard_logger.h" DECLARE_string(metric_log_dir); @@ -94,4 +96,5 @@ private: std::string hostName_; }; -} // namespace dynolog \ No newline at end of file +} // namespace dynolog +#endif \ No newline at end of file diff --git a/msmonitor/dynolog_npu/dynolog/src/Main.cpp b/msmonitor/dynolog_npu/dynolog/src/Main.cpp index 693b729bbd20e3aa23436ac6fbffdf6f85c88726..6e89a57425aa80c3186e4105f31a2f4d8bd7d1e1 100644 --- a/msmonitor/dynolog_npu/dynolog/src/Main.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/Main.cpp @@ -69,6 +69,9 @@ std::unique_ptr getLogger(const std::string& scribe_category = "") { if (FLAGS_use_prometheus) { loggers.push_back(std::make_unique()); } + if (!FLAGS_metric_log_dir.empty()) { + loggers.push_back(std::make_unique(FLAGS_metric_log_dir)); + } #endif if (FLAGS_use_fbrelay) { loggers.push_back(std::make_unique()); @@ -82,9 +85,6 @@ std::unique_ptr getLogger(const std::string& scribe_category = "") { if (FLAGS_use_scuba && !scribe_category.empty()) { loggers.push_back(std::make_unique(scribe_category)); } - if (!FLAGS_metric_log_dir.empty()) { - loggers.push_back(std::make_unique(FLAGS_metric_log_dir)); - } return std::make_unique(std::move(loggers)); } diff --git a/msmonitor/dynolog_npu/dynolog/src/rpc/CMakeLists.txt b/msmonitor/dynolog_npu/dynolog/src/rpc/CMakeLists.txt deleted file mode 100644 index a0b74f82cf9be5cec400e6477183b15a52b76cdc..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/dynolog/src/rpc/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -find_package(OpenSSL REQUIRED) - -add_library(dynolog_rpc_lib STATIC - SimpleJsonServer.cpp SimpleJsonServer.h - ${CMAKE_CURRENT_SOURCE_DIR}/../ServiceHandler.h -) -target_include_directories(dynolog_rpc_lib - INTERFACE ${CMAKE_CURRENT_SOURCE_DIR} -) - -target_include_directories(dynolog_rpc_lib - PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/.. -) -target_link_libraries(dynolog_rpc_lib PRIVATE dynolog_lib) -target_link_libraries(dynolog_rpc_lib PUBLIC gflags::gflags) -target_link_libraries(dynolog_rpc_lib PUBLIC glog::glog) -target_link_libraries(dynolog_rpc_lib PUBLIC nlohmann_json::nlohmann_json) -target_link_libraries(dynolog_rpc_lib PUBLIC fmt::fmt) -target_link_libraries(dynolog_rpc_lib PRIVATE OpenSSL::SSL OpenSSL::Crypto) \ No newline at end of file diff --git a/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.cpp b/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.cpp deleted file mode 100644 index 17a6d42895b8d81a8818defa6defd3a5f3ffd1c6..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.cpp +++ /dev/null @@ -1,290 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -#include "dynolog/src/rpc/SimpleJsonServer.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -DEFINE_string(certs_dir, "", "TLS crets dir"); - -constexpr int CLIENT_QUEUE_LEN = 50; - -namespace dynolog { - -SimpleJsonServerBase::SimpleJsonServerBase(int port) : port_(port) { - initSocket(); - init_openssl(); - ctx_ = create_context(); - configure_context(ctx_); -} - -SimpleJsonServerBase::~SimpleJsonServerBase() { - if (thread_) { - stop(); - } - close(sock_fd_); -} - -void SimpleJsonServerBase::initSocket() { - struct sockaddr_in6 server_addr; - - /* Create socket for listening (client requests).*/ - sock_fd_ = ::socket(AF_INET6, SOCK_STREAM, 0); - if (sock_fd_ == -1) { - std::perror("socket()"); - return; - } - - /* Set socket to reuse address in case server is restarted.*/ - int flag = 1; - int ret = - ::setsockopt(sock_fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); - if (ret == -1) { - std::perror("setsockopt()"); - return; - } - - // in6addr_any allows us to bind to both IPv4 and IPv6 clients. - server_addr.sin6_addr = in6addr_any; - server_addr.sin6_family = AF_INET6; - server_addr.sin6_port = htons(port_); - - /* Bind address and socket together */ - ret = ::bind(sock_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr)); - if (ret == -1) { - std::perror("bind()"); - close(sock_fd_); - return; - } - - /* Create listening queue (client requests) */ - ret = ::listen(sock_fd_, CLIENT_QUEUE_LEN); - if (ret == -1) { - std::perror("listen()"); - close(sock_fd_); - return; - } - - /* Get port if assigned 0 */ - if (port_ == 0) { - socklen_t len_out = sizeof(server_addr); - ret = ::getsockname(sock_fd_, (struct sockaddr*)&server_addr, &len_out); - if (ret < 0 || len_out != sizeof(server_addr)) { - std::perror("getsockname()"); - } else { - port_ = ntohs(server_addr.sin6_port); - LOG(INFO) << "System assigned port = " << ntohs(server_addr.sin6_port); - } - } - - LOG(INFO) << "Listening to connections on port " << port_; - initSuccess_ = true; -} - -/* A simple wrapper to accept connections and read data - * - * Messages are prefixed using the length so we know how long a message - * to actually read. - * : int32_t len - * : char json[] - */ -class ClientSocketWrapper { - public: - ~ClientSocketWrapper() { - if (ssl_) { - SSL_shutdown(ssl_); - SSL_free(ssl_); - } - if (client_sock_fd_ != -1) { - ::close(client_sock_fd_); - } - } - - bool accept(int server_socket, SSL_CTX* ctx) { - struct sockaddr_in6 client_addr; - socklen_t client_addr_len = sizeof(client_addr); - std::array client_addr_str; - - client_sock_fd_ = ::accept( - server_socket, (struct sockaddr*)&client_addr, &client_addr_len); - if (client_sock_fd_ == -1) { - std::perror("accept()"); - return false; - } - - inet_ntop( - AF_INET6, - &(client_addr.sin6_addr), - client_addr_str.data(), - client_addr_str.size()); - LOG(INFO) << "Received connection from " << client_addr_str.data(); - - ssl_ = SSL_new(ctx); - SSL_set_fd(ssl_, client_sock_fd_); - if (SSL_accept(ssl_) <= 0) { - ERR_print_errors_fp(stderr); - return false; - } - LOG(INFO) << "SSL handshake success"; - return true; - } - - std::string get_message() { - int32_t msg_size = -1; - if (!read_helper((uint8_t*)&msg_size, sizeof(msg_size)) || msg_size <= 0) { - LOG(ERROR) << "Invalid message size = " << msg_size; - return ""; - } - std::string message; - message.resize(msg_size); - int recv = 0; - int ret = 1; - while (recv < msg_size && ret > 0) { - ret = read_helper((uint8_t*)&message[recv], msg_size - recv); - recv += ret > 0 ? ret : 0; - } - if (recv != msg_size) { - LOG(ERROR) << "Received partial message, expected size " << msg_size - << " found : " << recv; - LOG(ERROR) << "Message received = " << message; - return ""; - } - return message; - } - - bool send_response(const std::string& response) { - int32_t size = response.size(); - int ret = SSL_write(ssl_, (void*)&size, sizeof(size)); - if (ret <= 0) { - ERR_print_errors_fp(stderr); - return false; - } - int sent = 0; - while (sent < size && ret > 0) { - ret = SSL_write(ssl_, (void*)&response[sent], size - sent); - if (ret <= 0) { - ERR_print_errors_fp(stderr); - } else { - sent += ret; - } - } - if (sent < response.size()) { - LOG(ERROR) << "Unable to write full response"; - return false; - } - return ret > 0; - } - - private: - int read_helper(uint8_t* buf, int size) { - int ret = SSL_read(ssl_, (void*)buf, size); - if (ret <= 0) { - ERR_print_errors_fp(stderr); - } - return ret; - } - - int client_sock_fd_ = -1; - SSL* ssl_ = nullptr; -}; - -/* Accepts socket connections and processes the payloads. - * This will inturn call the Handler functions*/ -void SimpleJsonServerBase::loop() noexcept { - if (sock_fd_ == -1 || !initSuccess_) { - return; - } - - while (run_) { - processOne(); - } -} - -void SimpleJsonServerBase::processOne() noexcept { - LOG(INFO) << "Waiting for connection."; - ClientSocketWrapper client; - if (!client.accept(sock_fd_, ctx_)) { - return; - } - std::string request_str = client.get_message(); - LOG(INFO) << "RPC message received = " << request_str; - auto response_str = processOneImpl(request_str); - if (response_str.empty()) { - return; - } - if (!client.send_response(response_str)) { - LOG(ERROR) << "Failed to send response"; - } -} - -void SimpleJsonServerBase::run() { - LOG(INFO) << "Launching RPC thread"; - thread_ = std::make_unique([this]() { this->loop(); }); -} - -void SimpleJsonServerBase::init_openssl() -{ - SSL_load_error_strings(); - OpenSSL_add_ssl_algorithms(); -} - -SSL_CTX* SimpleJsonServerBase::create_context() -{ - const SSL_METHOD* method = TLS_server_method(); - SSL_CTX* ctx = SSL_CTX_new(method); - if (!ctx) { - perror("Unable to create SSL context"); - ERR_print_errors_fp(stderr); - exit(EXIT_FAILURE); - } - return ctx; -} - -void SimpleJsonServerBase::configure_context(SSL_CTX* ctx) -{ - if (FLAGS_certs_dir.empty()) { - LOG(ERROR) << "--certs-dir must be specified!"; - exit(EXIT_FAILURE); - } - - std::string certs_dir = FLAGS_certs_dir; - if (!certs_dir.empty() && certs_dir.back() != '/') - certs_dir += '/'; - - std::string server_cert = certs_dir + "server.crt"; - std::string server_key = certs_dir + "server.key"; - std::string ca_cert = certs_dir + "ca.crt"; - - LOG(INFO) << "Loading server cert: " << server_cert; - LOG(INFO) << "Loading server key: " << server_key; - LOG(INFO) << "Loading CA cert: " << ca_cert; - - // 加载服务器证书 - if (SSL_CTX_use_certificate_file(ctx, server_cert.c_str(), SSL_FILETYPE_PEM) <= 0) { - ERR_print_errors_fp(stderr); - exit(EXIT_FAILURE); - } - // 加载服务器私钥 - if (SSL_CTX_use_PrivateKey_file(ctx, server_key.c_str(), SSL_FILETYPE_PEM) <= 0 ) { - ERR_print_errors_fp(stderr); - exit(EXIT_FAILURE); - } - // 加载CA证书,实现客户端证书校验 - if (SSL_CTX_load_verify_locations(ctx, ca_cert.c_str(), NULL) <= 0) { - ERR_print_errors_fp(stderr); - exit(EXIT_FAILURE); - } - // 要求客户端必须提供证书 - SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); -} - -} // namespace dynolog \ No newline at end of file diff --git a/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.h b/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.h deleted file mode 100644 index df5d66f75b54e88dd4c0dff01b7c28ef545cb106..0000000000000000000000000000000000000000 --- a/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServer.h +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include "dynolog/src/ServiceHandler.h" - -DECLARE_string(certs_dir); - -namespace dynolog { - -// This is a simple service built using UNIX Sockets -// with remote procedure calls implemented via JSON string. - -class SimpleJsonServerBase { - public: - explicit SimpleJsonServerBase(int port); - virtual ~SimpleJsonServerBase(); - - int getPort() const { - return port_; - } - - bool initSuccessful() const { - return initSuccess_; - } - // spin up a new thread to process requets - void run(); - - void stop() { - run_ = 0; - thread_->join(); - } - - // synchronously processes a request - void processOne() noexcept; - - protected: - void initSocket(); - void init_openssl(); - SSL_CTX* create_context(); - void configure_context(SSL_CTX* ctx); - - // process requests in a loop - void loop() noexcept; - - // implement processing of request using the handler - virtual std::string processOneImpl(const std::string& request_str) { - return ""; - } - - int port_; - int sock_fd_{-1}; - bool initSuccess_{false}; - - std::atomic run_{true}; - std::unique_ptr thread_; - - SSL_CTX* ctx_{nullptr}; -}; - -} // namespace dynolog \ No newline at end of file diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index b08f7e3e3df0c9fb0d2905cd7463480bf1b17b7d..79f75ac3e7241e01ff4acb7de152f47f5ae898d3 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -4,27 +4,11 @@ namespace py = pybind11; -void init_IPCMonitor(PyObject *module) { - py::class_(module, "PyDynamicMonitorProxy") +PYBIND11_MODULE(IPCMonitor, m) { + py::class_(m, "PyDynamicMonitorProxy") .def(py::init<>()) .def("init_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::InitDyno, py::arg("npuId")) .def("poll_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::PollDyno) .def("enable_dyno_npu_monitor", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::EnableMsptiMonitor, py::arg("cfg_map")) .def("finalize_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::FinalizeDyno); -} - -static PyMethodDef g_moduleMethods[] = {}; - -static struct PyModuleDef ipcMonitor_module = { - PyModuleDef_HEAD_INIT, - "IPCMonitor", - nullptr, - -1, - g_moduleMethods -}; - -PyMODINIT_FUNC PyInit_IPCMonitor(void) { - PyObject* m = PyModule_Create(&ipcMonitor_module); - init_IPCMonitor(m); - return m; } \ No newline at end of file diff --git a/msmonitor/scripts/build.sh b/msmonitor/scripts/build.sh index f2203d1734537b55bab7c61f1240424afdd550f4..a7aaa726dee761a157f63913e38fdf1726429ff9 100644 --- a/msmonitor/scripts/build.sh +++ b/msmonitor/scripts/build.sh @@ -1,6 +1,5 @@ #!/bin/bash set -e -export BUILD_PROMETHEUS=1 check_gcc_version() { if ! command -v gcc >/dev/null 2>&1; then