diff --git a/gitp2p-rs/README.md b/gitp2p-rs/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ab7807a42bc8fdc9c29536e1d528a0dab21d5182 --- /dev/null +++ b/gitp2p-rs/README.md @@ -0,0 +1,5 @@ +这个目录是mega项目的p2p部分,并不能直接运行 + +详情请查看 + +https://github.com/web3infra-foundation/mega.git \ No newline at end of file diff --git a/gitp2p-rs/p2p/BUILD b/gitp2p-rs/p2p/BUILD new file mode 100644 index 0000000000000000000000000000000000000000..a0c1979df585dbaec44eb66704800e527bb93460 --- /dev/null +++ b/gitp2p-rs/p2p/BUILD @@ -0,0 +1,46 @@ + +load("@crate_index//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test", "rust_doc_test") + + +rust_library( + name = "p2p", + srcs = glob([ + "src/**/*.rs", + ]), + aliases = aliases(), + deps = all_crate_deps() + [ + "//git", + "//common", + "//database", + "//database/entity" + ], + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + visibility = ["//visibility:public"], +) + +rust_test( + name = "test", + crate = ":p2p", + aliases = aliases( + normal_dev = True, + proc_macro_dev = True, + ), + deps = all_crate_deps( + normal_dev = True, + ) + [ + "//git", + "//common", + "//database", + ], + proc_macro_deps = all_crate_deps( + proc_macro_dev = True, + ), +) + +rust_doc_test( + name = "doctests", + crate = ":p2p", +) diff --git a/gitp2p-rs/p2p/Cargo.toml b/gitp2p-rs/p2p/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..524f62f755bc23ea87c62b8fb61d1a51fb4cb215 --- /dev/null +++ b/gitp2p-rs/p2p/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "p2p" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +name = "p2p" +path = "src/lib.rs" + + +[dependencies] +git = { path = "../git" } +database = { path = "../database" } +entity = { path = "../database/entity" } +common = { path = "../common" } +bytes = "1.4.0" +tokio = "1.32.0" +tracing = "0.1.37" +futures = "0.3.28" +futures-timer = "3.0.2" +async-std = { version = "1.10", features = ["attributes"] } +libp2p = { version = "0.52.3", features = ["dcutr", "kad", "yamux", "noise", "identify", "macros", "relay", "tcp", "async-std", "rendezvous", "request-response", "cbor"] } +serde = { version = "1.0.188", features = ["derive"] } +clap = { version = "4.4.0", features = ["derive"] } +#sea-orm = "0.12.2" +serde_json = "1.0.105" diff --git a/gitp2p-rs/p2p/README.md b/gitp2p-rs/p2p/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ccd046fbbb85ddbec4a94796c58106148308ec5e --- /dev/null +++ b/gitp2p-rs/p2p/README.md @@ -0,0 +1,47 @@ +## How to use the p2p function + +### start a relay-server + +``` +cargo run p2p --host 0.0.0.0 --port 8001 --relay-server +``` + +### start a client + +``` +cargo run p2p --host 0.0.0.0 --port 8002 --bootstrap-node /ip4/{relay-server-ip}/tcp/8001 +``` + +### start another client + +``` +cargo run p2p --host 0.0.0.0 --port 8003 --bootstrap-node /ip4/{relay-server-ip}/tcp/8001 +``` + +### try to use DHT + +#### put a key-value to p2p network in one terminal + +``` +kad put 123 abc +``` + +#### get a key-value from p2p network in another terminal + +``` +kad get 123 +``` + +### try to clone a repository + +``` +mega clone p2p://12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X/mega_test.git +``` + +``` +mega pull p2p://12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X/mega_test.git +``` + +``` +mega clone-object mega_test.git +``` \ No newline at end of file diff --git a/gitp2p-rs/p2p/src/lib.rs b/gitp2p-rs/p2p/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..94cffc74a45d7aa5b11aafe8d6b050bf779c76dc --- /dev/null +++ b/gitp2p-rs/p2p/src/lib.rs @@ -0,0 +1,35 @@ +//! +//! +//! +//! +//! +//! + +use database::driver::ObjectStorage; +use git::protocol::{PackProtocol, Protocol}; +use std::path::PathBuf; +use std::sync::Arc; + +pub mod network; +pub mod node; +pub mod peer; + +async fn get_pack_protocol(path: &str, storage: Arc) -> PackProtocol { + let path = del_ends_str(path, ".git"); + PackProtocol::new(PathBuf::from(path), storage, Protocol::P2p) +} + +pub fn get_repo_full_path(repo_name: &str) -> String { + let repo_name = del_ends_str(repo_name, ".git"); + "/root/".to_string() + repo_name +} + +pub fn del_ends_str<'a>(mut s: &'a str, end: &str) -> &'a str { + if s.ends_with(end) { + s = s.split_at(s.len() - end.len()).0; + } + s +} + +#[cfg(test)] +mod tests {} diff --git a/gitp2p-rs/p2p/src/network/behaviour.rs b/gitp2p-rs/p2p/src/network/behaviour.rs new file mode 100644 index 0000000000000000000000000000000000000000..e38e0f9138f6dc3bfb224d0d0812280e2e330a7c --- /dev/null +++ b/gitp2p-rs/p2p/src/network/behaviour.rs @@ -0,0 +1,103 @@ +use entity::git_obj; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::{Kademlia, KademliaEvent}; +use libp2p::swarm::NetworkBehaviour; +use libp2p::{dcutr, identify, relay, rendezvous, request_response}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "Event")] +pub struct Behaviour { + pub relay_client: relay::client::Behaviour, + pub identify: identify::Behaviour, + pub dcutr: dcutr::Behaviour, + pub kademlia: Kademlia, + pub rendezvous: rendezvous::client::Behaviour, + pub git_upload_pack: request_response::cbor::Behaviour, + pub git_info_refs: request_response::cbor::Behaviour, + pub git_object: request_response::cbor::Behaviour, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitUploadPackReq( + //want + pub HashSet, + //have + pub HashSet, + //path + pub String, +); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitUploadPackRes(pub Vec, pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitInfoRefsReq(pub String); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitInfoRefsRes(pub String, pub Vec); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitObjectReq(pub String, pub Vec); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitObjectRes(pub Vec); + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + Identify(identify::Event), + RelayClient(relay::client::Event), + Dcutr(dcutr::Event), + Kademlia(KademliaEvent), + Rendezvous(rendezvous::client::Event), + GitUploadPack(request_response::Event), + GitInfoRefs(request_response::Event), + GitObject(request_response::Event), +} + +impl From for Event { + fn from(e: identify::Event) -> Self { + Event::Identify(e) + } +} + +impl From for Event { + fn from(e: relay::client::Event) -> Self { + Event::RelayClient(e) + } +} + +impl From for Event { + fn from(e: dcutr::Event) -> Self { + Event::Dcutr(e) + } +} + +impl From for Event { + fn from(e: KademliaEvent) -> Self { + Event::Kademlia(e) + } +} + +impl From for Event { + fn from(e: rendezvous::client::Event) -> Self { + Event::Rendezvous(e) + } +} + +impl From> for Event { + fn from(event: request_response::Event) -> Self { + Event::GitUploadPack(event) + } +} + +impl From> for Event { + fn from(event: request_response::Event) -> Self { + Event::GitInfoRefs(event) + } +} + +impl From> for Event { + fn from(event: request_response::Event) -> Self { + Event::GitObject(event) + } +} diff --git a/gitp2p-rs/p2p/src/network/event_handler.rs b/gitp2p-rs/p2p/src/network/event_handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..b3c701371203a1a2cc74534b1b2b21046286097a --- /dev/null +++ b/gitp2p-rs/p2p/src/network/event_handler.rs @@ -0,0 +1,663 @@ +use super::behaviour; +use crate::network::behaviour::{ + GitInfoRefsReq, GitInfoRefsRes, GitObjectReq, GitObjectRes, GitUploadPackReq, GitUploadPackRes, +}; +use crate::node::{get_utc_timestamp, ClientParas, Fork, MegaRepoInfo}; +use crate::{get_pack_protocol, get_repo_full_path}; +use bytes::Bytes; +use common::utils; +use entity::git_obj::Model; +use git::protocol::{CommandType, RefCommand}; +use git::structure::conversion; +use libp2p::kad::record::Key; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::{ + AddProviderOk, GetClosestPeersOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, + PeerRecord, PutRecordOk, QueryResult, Quorum, Record, +}; +use libp2p::{identify, multiaddr, rendezvous, request_response, PeerId, Swarm}; +use std::collections::HashSet; +use std::path::Path; +use std::str::FromStr; + +pub const NAMESPACE: &str = "rendezvous_mega"; + +pub fn kad_event_handler( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + event: KademliaEvent, +) { + if let KademliaEvent::OutboundQueryProgressed { id, result, .. } = event { + match result { + QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(PeerRecord { record, peer }))) => { + let peer_id = match peer { + Some(id) => id.to_string(), + None => "local".to_string(), + }; + tracing::info!( + "Got record key[{}]={},from {}", + String::from_utf8(record.key.to_vec()).unwrap(), + String::from_utf8(record.value.clone()).unwrap(), + peer_id + ); + if let Some(object_id) = client_paras.pending_repo_info_update_fork.get(&id) { + tracing::info!("update repo info forks"); + // update repo info forks + if let Ok(p) = serde_json::from_slice(&record.value) { + let mut repo_info: MegaRepoInfo = p; + let fork = Fork { + peer: swarm.local_peer_id().to_string(), + latest: object_id.clone(), + timestamp: get_utc_timestamp(), + }; + repo_info.forks.push(fork); + let record = Record { + key: Key::new(&repo_info.name), + value: serde_json::to_vec(&repo_info).unwrap(), + publisher: None, + expires: None, + }; + if let Err(e) = swarm + .behaviour_mut() + .kademlia + .put_record(record, Quorum::One) + { + eprintln!("Failed to store record:{}", e); + } + } + client_paras.pending_repo_info_update_fork.remove(&id); + } else if let Some(repo_name) = client_paras + .pending_repo_info_search_to_download_obj + .get(&id) + { + //try to search origin node + tracing::info!("try to get origin node to search git_obj_id_list"); + if let Ok(p) = serde_json::from_slice(&record.value) { + let repo_info: MegaRepoInfo = p; + //save all node that have this repo,the first one is origin + let mut node_id_list: Vec = Vec::new(); + node_id_list.push(repo_info.origin.clone()); + for fork in &repo_info.forks { + node_id_list.push(fork.peer.clone()); + } + client_paras + .repo_node_list + .insert(repo_name.clone(), node_id_list); + let remote_peer_id = PeerId::from_str(&repo_info.origin).unwrap(); + let path = get_repo_full_path(repo_name); + //to get all git_obj id + let request_file_id = swarm + .behaviour_mut() + .git_info_refs + .send_request(&remote_peer_id, GitInfoRefsReq(path)); + client_paras + .pending_git_obj_id_download + .insert(request_file_id, repo_name.to_string()); + } + } + client_paras + .pending_repo_info_search_to_download_obj + .remove(&id); + } + QueryResult::GetRecord(Err(err)) => { + tracing::error!("Failed to get record: {err:?}"); + } + QueryResult::PutRecord(Ok(PutRecordOk { key })) => { + tracing::info!( + "Successfully put record {:?}", + std::str::from_utf8(key.as_ref()).unwrap() + ); + } + QueryResult::PutRecord(Err(err)) => { + tracing::error!("Failed to put record: {err:?}"); + } + QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { peers, .. })) => { + for x in peers { + tracing::info!("{}", x); + } + } + QueryResult::GetClosestPeers(Err(err)) => { + tracing::error!("Failed to get closest peers: {err:?}"); + } + QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { providers, .. }), ..) => { + tracing::info!("FoundProviders: {providers:?}"); + } + QueryResult::GetProviders(Err(e)) => { + tracing::error!("GetProviders error: {e:?}"); + } + QueryResult::StartProviding(Ok(AddProviderOk { key, .. }), ..) => { + tracing::info!("StartProviding: {key:?}"); + } + _ => {} + } + } +} + +pub fn rendezvous_client_event_handler( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + event: rendezvous::client::Event, +) { + match event { + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + } => { + tracing::info!( + "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", + namespace, + rendezvous_node, + ttl + ); + if let Some(rendezvous_point) = client_paras.rendezvous_point { + swarm.behaviour_mut().rendezvous.discover( + Some(rendezvous::Namespace::from_static(NAMESPACE)), + client_paras.cookie.clone(), + None, + rendezvous_point, + ) + } + } + rendezvous::client::Event::Discovered { + registrations, + cookie: new_cookie, + .. + } => { + client_paras.cookie.replace(new_cookie); + for registration in registrations { + for address in registration.record.addresses() { + let peer = registration.record.peer_id(); + tracing::info!("Rendezvous Discovered peer {} at {}", peer, address); + if peer == *swarm.local_peer_id() { + continue; + } + //dial via relay + if let Some(bootstrap_address) = client_paras.bootstrap_node_addr.clone() { + swarm + .dial( + bootstrap_address + .with(multiaddr::Protocol::P2pCircuit) + .with(multiaddr::Protocol::P2p(peer)), + ) + .unwrap(); + } + } + } + } + rendezvous::client::Event::RegisterFailed { error, .. } => { + tracing::error!("Failed to register: error_code={:?}", error); + } + rendezvous::client::Event::DiscoverFailed { + rendezvous_node, + namespace, + error, + } => { + tracing::error!( + "Failed to discover: rendezvous_node={}, namespace={:?}, error_code={:?}", + rendezvous_node, + namespace, + error + ); + } + event => { + tracing::debug!("Rendezvous client event:{:?}", event); + } + } +} + +pub fn rendezvous_server_event_handler(event: rendezvous::server::Event) { + match event { + rendezvous::server::Event::PeerRegistered { peer, registration } => { + tracing::info!( + "Peer {} registered for namespace '{}'", + peer, + registration.namespace + ); + } + rendezvous::server::Event::DiscoverServed { + enquirer, + registrations, + .. + } => { + tracing::info!( + "Served peer {} with {} registrations", + enquirer, + registrations.len() + ); + } + + event => { + tracing::info!("Rendezvous server event:{:?}", event); + } + } +} + +pub fn identify_event_handler(kademlia: &mut Kademlia, event: identify::Event) { + match event { + identify::Event::Received { peer_id, info } => { + tracing::info!("IdentifyEvent Received peer_id:{:?}", peer_id); + tracing::info!("IdentifyEvent Received info:{:?}", info); + for addr in info.listen_addrs { + kademlia.add_address(&peer_id, addr); + } + } + + identify::Event::Error { error, .. } => { + tracing::debug!("IdentifyEvent Error :{:?}", error); + } + _ => {} + } +} + +pub async fn git_upload_pack_event_handler( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + event: request_response::Event, +) { + match event { + request_response::Event::Message { message, .. } => match message { + request_response::Message::Request { + request, channel, .. + } => { + //receive git upload pack request + tracing::debug!( + "Git upload pack event handler, {:?}, {:?}", + request, + channel + ); + let want = request.0; + let have = request.1; + let path = request.2; + tracing::info!("path: {}", path); + match git_upload_pack_handler(&path, client_paras, want, have).await { + Ok((send_pack_data, object_id)) => { + let _ = swarm + .behaviour_mut() + .git_upload_pack + .send_response(channel, GitUploadPackRes(send_pack_data, object_id)); + } + Err(e) => { + tracing::error!("{}", e); + let response = format!("ERR: {}", e); + let _ = swarm.behaviour_mut().git_upload_pack.send_response( + channel, + GitUploadPackRes(response.into_bytes(), utils::ZERO_ID.to_string()), + ); + } + } + } + request_response::Message::Response { + request_id, + response, + } => { + // receive a git_upload_pack response + tracing::info!( + "Git upload pack event response, request_id: {:?}", + request_id, + ); + if let Some(repo_name) = client_paras.pending_git_upload_package.get(&request_id) { + let package_data = response.0; + let object_id = response.1; + if package_data.starts_with("ERR:".as_bytes()) { + tracing::error!("{}", String::from_utf8(package_data).unwrap()); + return; + } + let path = get_repo_full_path(repo_name); + let mut pack_protocol = + get_pack_protocol(&path, client_paras.storage.clone()).await; + let command = RefCommand { + ref_name: String::from("refs/heads/master"), + old_id: String::from("0000000000000000000000000000000000000000"), + new_id: object_id.clone(), + status: String::from("ok"), + error_msg: String::new(), + command_type: CommandType::Create, + }; + pack_protocol.command_list.push(command); + let result = pack_protocol + .git_receive_pack(Bytes::from(package_data)) + .await; + match result { + Ok(_) => { + tracing::info!("Save git package successfully :{}", repo_name); + //update repoInfo + let kad_query_id = swarm + .behaviour_mut() + .kademlia + .get_record(Key::new(&repo_name)); + client_paras + .pending_repo_info_update_fork + .insert(kad_query_id, object_id); + } + Err(e) => { + tracing::error!("{}", e); + } + } + client_paras.pending_git_upload_package.remove(&request_id); + } + } + }, + request_response::Event::OutboundFailure { peer, error, .. } => { + tracing::error!("Git upload pack outbound failure: {:?},{:?}", peer, error); + } + event => { + tracing::debug!("Request_response event:{:?}", event); + } + } +} + +pub async fn git_info_refs_event_handler( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + event: request_response::Event, +) { + match event { + request_response::Event::Message { message, peer, .. } => match message { + request_response::Message::Request { + request, channel, .. + } => { + //receive git info refs request + tracing::debug!("Receive git info refs event, {:?}, {:?}", request, channel); + let path = request.0; + tracing::info!("path: {}", path); + let pack_protocol = get_pack_protocol(&path, client_paras.storage.clone()).await; + let ref_git_id = pack_protocol.get_head_object_id(Path::new(&path)).await; + let mut git_ids: Vec = Vec::new(); + if let Ok(commit_models) = + pack_protocol.storage.get_all_commits_by_path(&path).await + { + commit_models.iter().for_each(|model| { + git_ids.push(model.git_id.clone()); + }); + } + if let Ok(blob_and_tree) = pack_protocol + .storage + .get_node_by_path(Path::new(&path)) + .await + { + blob_and_tree.iter().for_each(|model| { + git_ids.push(model.git_id.clone()); + }); + } + let _ = swarm + .behaviour_mut() + .git_info_refs + .send_response(channel, GitInfoRefsRes(ref_git_id, git_ids)); + } + request_response::Message::Response { + request_id, + response, + } => { + //receive git info refs response + tracing::info!("Response git info refs event, request_id: {:?}", request_id); + if let Some(repo_name) = client_paras.pending_git_pull.get(&request_id) { + //have git_ids and try to send pull request + let ref_git_id = response.0; + let _git_ids = response.1; + tracing::info!("repo_name: {}", repo_name); + tracing::info!("ref_git_id: {:?}", ref_git_id); + if ref_git_id == *utils::ZERO_ID { + eprintln!("Repo not found"); + return; + } + let path = get_repo_full_path(repo_name); + let pack_protocol = + get_pack_protocol(&path, client_paras.storage.clone()).await; + //generate want and have collection + let mut want: HashSet = HashSet::new(); + let mut have: HashSet = HashSet::new(); + want.insert(ref_git_id); + let commit_models = pack_protocol + .storage + .get_all_commits_by_path(&path) + .await + .unwrap(); + commit_models.iter().for_each(|model| { + have.insert(model.git_id.clone()); + }); + //send new request to git_upload_pack + let new_request_id = swarm + .behaviour_mut() + .git_upload_pack + .send_request(&peer, GitUploadPackReq(want, have, path)); + client_paras + .pending_git_upload_package + .insert(new_request_id, repo_name.to_string()); + client_paras.pending_git_pull.remove(&request_id); + return; + } + if let Some(repo_name) = client_paras + .pending_git_obj_id_download + .clone() + .get(&request_id) + { + // have git_ids and try to download git obj + let _ref_git_id = response.0; + let git_ids = response.1; + let path = get_repo_full_path(repo_name); + tracing::info!("path: {}", path); + tracing::info!("git_ids: {:?}", git_ids); + //trying to download git_obj from peers + if let Some(repo_list) = client_paras.repo_node_list.get(repo_name) { + if !repo_list.is_empty() { + tracing::info!("try to download git object from: {:?}", repo_list); + tracing::info!("the origin is: {}", repo_list[0]); + // Try to download separately + let split_git_ids = split_array(git_ids.clone(), repo_list.len()); + let repo_id_need_list_arc = client_paras.repo_id_need_list.clone(); + { + let mut repo_id_need_list = repo_id_need_list_arc.lock().unwrap(); + repo_id_need_list.insert(repo_name.to_string(), git_ids); + } + + for i in 0..repo_list.len() { + // send get git object request + let ids = split_git_ids[i].clone(); + let repo_peer_id = PeerId::from_str(&repo_list[i].clone()).unwrap(); + let new_request_id = swarm + .behaviour_mut() + .git_object + .send_request(&repo_peer_id, GitObjectReq(path.clone(), ids)); + client_paras + .pending_git_obj_download + .insert(new_request_id, repo_name.to_string()); + } + } + } + client_paras.pending_git_obj_id_download.remove(&request_id); + } + } + }, + request_response::Event::OutboundFailure { peer, error, .. } => { + tracing::error!("Git info refs outbound failure: {:?},{:?}", peer, error); + } + event => { + tracing::debug!("Request_response event:{:?}", event); + } + } +} + +pub async fn git_object_event_handler( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + event: request_response::Event, +) { + match event { + request_response::Event::Message { peer, message, .. } => match message { + request_response::Message::Request { + request, channel, .. + } => { + //receive git object request + tracing::debug!("Receive git object event, {:?}, {:?}", request, channel); + let path = request.0; + let git_ids = request.1; + tracing::info!("path: {}", path); + tracing::info!("git_ids: {:?}", git_ids); + let pack_protocol = get_pack_protocol(&path, client_paras.storage.clone()).await; + let git_obj_models = match pack_protocol.storage.get_obj_data_by_ids(git_ids).await + { + Ok(models) => models, + Err(e) => { + tracing::error!("{:?}", e); + return; + } + }; + let _ = swarm + .behaviour_mut() + .git_object + .send_response(channel, GitObjectRes(git_obj_models)); + } + request_response::Message::Response { + request_id, + response, + } => { + //receive git object response + tracing::debug!("Response git object event, request_id: {:?}", request_id); + let git_obj_models = response.0; + tracing::info!( + "Receive {:?} git_obj, from {:?}", + git_obj_models.len(), + peer + ); + let receive_id_list: Vec = git_obj_models + .clone() + .iter() + .map(|m| m.git_id.clone()) + .collect(); + tracing::info!("git_obj_id_list:{:?}", receive_id_list); + + if let Some(repo_name) = client_paras.pending_git_obj_download.get(&request_id) { + let repo_receive_git_obj_model_list_arc = + client_paras.repo_receive_git_obj_model_list.clone(); + { + let mut receive_git_obj_model_map = + repo_receive_git_obj_model_list_arc.lock().unwrap(); + receive_git_obj_model_map + .entry(repo_name.clone()) + .or_insert(Vec::new()); + let receive_obj_model_list = + receive_git_obj_model_map.get(repo_name).unwrap(); + let mut clone = receive_obj_model_list.clone(); + clone.append(&mut git_obj_models.clone()); + tracing::info!("receive_obj_model_list:{:?}", clone.len()); + receive_git_obj_model_map.insert(repo_name.to_string(), clone); + } + + let repo_id_need_list_arc = client_paras.repo_id_need_list.clone(); + let mut finish = false; + { + let mut repo_id_need_list_map = repo_id_need_list_arc.lock().unwrap(); + if let Some(id_need_list) = repo_id_need_list_map.get(repo_name) { + let mut clone = id_need_list.clone(); + clone.retain(|x| !receive_id_list.contains(x)); + if clone.is_empty() { + finish = true; + } + repo_id_need_list_map.insert(repo_name.to_string(), clone); + } + } + println!("finish:{}", finish); + if finish { + let repo_receive_git_obj_model_list_arc2 = + client_paras.repo_receive_git_obj_model_list.clone(); + let mut obj_model_list: Vec = Vec::new(); + { + let mut receive_git_obj_model_map = + repo_receive_git_obj_model_list_arc2.lock().unwrap(); + if !receive_git_obj_model_map.contains_key(repo_name) { + tracing::error!("git_object cache error"); + return; + } + let receive_git_obj_model = + receive_git_obj_model_map.get(repo_name).unwrap(); + obj_model_list.append(&mut receive_git_obj_model.clone()); + receive_git_obj_model_map.remove(repo_name); + } + tracing::info!("receive all git_object :{:?}", obj_model_list.len()); + let path = get_repo_full_path(repo_name); + match conversion::save_node_from_git_obj( + client_paras.storage.clone(), + Path::new(&path), + obj_model_list.clone(), + ) + .await + { + Ok(_) => { + tracing::info!( + "Save {:?} git_obj to database successfully", + obj_model_list.len() + ); + } + Err(e) => { + tracing::error!("{:?}", e); + } + } + } + client_paras.pending_git_obj_download.remove(&request_id); + } + } + }, + request_response::Event::OutboundFailure { peer, error, .. } => { + tracing::error!("Git object outbound failure: {:?},{:?}", peer, error); + } + event => { + tracing::debug!("Request_response event:{:?}", event); + } + } +} + +async fn git_upload_pack_handler( + path: &str, + client_paras: &mut ClientParas, + want: HashSet, + have: HashSet, +) -> Result<(Vec, String), String> { + let pack_protocol = get_pack_protocol(path, client_paras.storage.clone()).await; + let object_id = pack_protocol.get_head_object_id(Path::new(path)).await; + if object_id == *utils::ZERO_ID { + return Err("Repository not found".to_string()); + } + tracing::info!("object_id:{}", object_id); + tracing::info!("want: {:?}, have: {:?}", want, have); + if have.is_empty() { + //clone + let send_pack_data = match pack_protocol.get_full_pack_data(Path::new(path)).await { + Ok(send_pack_data) => send_pack_data, + Err(e) => { + tracing::error!("{}", e); + return Err(e.to_string()); + } + }; + Ok((send_pack_data, object_id)) + } else { + //pull + let send_pack_data = match pack_protocol + .get_incremental_pack_data(Path::new(&path), &want, &have) + .await + { + Ok(send_pack_data) => send_pack_data, + Err(e) => { + tracing::error!("{}", e); + return Err(e.to_string()); + } + }; + Ok((send_pack_data, object_id)) + } +} + +fn split_array(a: Vec, count: usize) -> Vec> { + let mut result = vec![]; + let split_num = a.len() / count; + for i in 0..count { + let v: Vec<_> = if i != count - 1 { + a.clone() + .drain(i * split_num..(i + 1) * split_num) + .collect() + } else { + a.clone().drain(i * split_num..).collect() + }; + result.push(v); + } + result +} diff --git a/gitp2p-rs/p2p/src/network/mod.rs b/gitp2p-rs/p2p/src/network/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..2b2fecb9bb0bda3b66d922d762277e7d4d31a79a --- /dev/null +++ b/gitp2p-rs/p2p/src/network/mod.rs @@ -0,0 +1,12 @@ +//! +//! +//! +//! +//! +//! + +pub mod behaviour; +pub mod event_handler; + +#[cfg(test)] +mod tests {} diff --git a/gitp2p-rs/p2p/src/node/client.rs b/gitp2p-rs/p2p/src/node/client.rs new file mode 100644 index 0000000000000000000000000000000000000000..29288a63d131ecffa414f789d99ebdb77ad87c8f --- /dev/null +++ b/gitp2p-rs/p2p/src/node/client.rs @@ -0,0 +1,266 @@ +use super::input_command; +use super::ClientParas; +use crate::network::behaviour::{self, Behaviour, Event}; +use crate::network::event_handler; +use async_std::io; +use async_std::io::prelude::BufReadExt; +use database::DataSource; +use entity::git_obj::Model; +use futures::executor::block_on; +use futures::{future::FutureExt, stream::StreamExt}; +use libp2p::core::upgrade; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::Kademlia; +use libp2p::request_response::ProtocolSupport; +use libp2p::swarm::{SwarmBuilder, SwarmEvent}; +use libp2p::{ + dcutr, identify, identity, multiaddr, noise, relay, rendezvous, request_response, tcp, yamux, + Multiaddr, PeerId, StreamProtocol, Transport, +}; +use std::collections::HashMap; +use std::error::Error; +use std::sync::{Arc, Mutex}; + +pub async fn run( + local_key: identity::Keypair, + p2p_address: String, + bootstrap_node: String, + data_source: DataSource, +) -> Result<(), Box> { + let local_peer_id = PeerId::from(local_key.public()); + tracing::info!("Local peer id: {local_peer_id:?}"); + + let (relay_transport, client) = relay::client::new(local_peer_id); + + let tcp_transport = relay_transport + .or_transport(tcp::async_io::Transport::new( + tcp::Config::default().port_reuse(true), + )) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&local_key)?) + // .authenticate(tls::Config::new(&local_key).unwrap()) + .multiplex(yamux::Config::default()) + .boxed(); + + let store = MemoryStore::new(local_peer_id); + + let behaviour = Behaviour { + relay_client: client, + identify: identify::Behaviour::new(identify::Config::new( + "/mega/0.0.1".to_string(), + local_key.public(), + )), + dcutr: dcutr::Behaviour::new(local_peer_id), + //DHT + kademlia: Kademlia::new(local_peer_id, store), + //discover + rendezvous: rendezvous::client::Behaviour::new(local_key), + // git pull, git clone + git_upload_pack: request_response::cbor::Behaviour::new( + [( + StreamProtocol::new("/mega/git_upload_pack"), + ProtocolSupport::Full, + )], + request_response::Config::default(), + ), + // git info refs + git_info_refs: request_response::cbor::Behaviour::new( + [( + StreamProtocol::new("/mega/git_info_refs"), + ProtocolSupport::Full, + )], + request_response::Config::default(), + ), + // git download git_obj + git_object: request_response::cbor::Behaviour::new( + [(StreamProtocol::new("/mega/git_obj"), ProtocolSupport::Full)], + request_response::Config::default(), + ), + }; + let mut swarm = SwarmBuilder::without_executor(tcp_transport, behaviour, local_peer_id).build(); + + // Listen on all interfaces + swarm.listen_on(p2p_address.parse()?)?; + + tracing::info!("Connect to database"); + let storage = database::init(&data_source).await; + let mut client_paras = ClientParas { + cookie: None, + rendezvous_point: None, + bootstrap_node_addr: None, + storage, + pending_git_upload_package: HashMap::new(), + pending_git_pull: HashMap::new(), + pending_git_obj_download: HashMap::new(), + pending_repo_info_update_fork: HashMap::new(), + pending_repo_info_search_to_download_obj: HashMap::new(), + pending_git_obj_id_download: HashMap::new(), + repo_node_list: HashMap::new(), + repo_id_need_list: Arc::new(Mutex::new(HashMap::>::new())), + repo_receive_git_obj_model_list: Arc::new(Mutex::new(HashMap::>::new())), + }; + // Wait to listen on all interfaces. + block_on(async { + let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse(); + loop { + futures::select! { + event = swarm.next() => { + match event.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on {:?}", address); + } + event => panic!("{event:?}"), + } + } + _ = delay => { + // Likely listening on all interfaces now, thus continuing by breaking the loop. + break; + } + } + } + }); + + //dial to bootstrap_node + if !bootstrap_node.is_empty() { + let bootstrap_node_addr: Multiaddr = bootstrap_node.parse()?; + tracing::info!("Trying to dial bootstrap node{:?}", bootstrap_node_addr); + swarm.dial(bootstrap_node_addr.clone())?; + block_on(async { + let mut learned_observed_addr = false; + let mut told_relay_observed_addr = false; + let mut relay_peer_id: Option = None; + let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(10)).fuse(); + loop { + futures::select! { + event = swarm.next() => { + match event.unwrap() { + SwarmEvent::NewListenAddr { .. } => {} + SwarmEvent::Dialing{peer_id, ..} => { + tracing::info!("Dialing: {:?}", peer_id) + }, + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + client_paras.rendezvous_point.replace(peer_id); + let p2p_suffix = multiaddr::Protocol::P2p(peer_id); + let bootstrap_node_addr = + if !bootstrap_node_addr.ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) { + bootstrap_node_addr.clone().with(p2p_suffix) + } else { + bootstrap_node_addr.clone() + }; + client_paras.bootstrap_node_addr.replace(bootstrap_node_addr.clone()); + swarm.behaviour_mut().kademlia.add_address(&peer_id.clone(),bootstrap_node_addr.clone()); + tracing::info!("ConnectionEstablished:{} at {}", peer_id, bootstrap_node_addr); + }, + SwarmEvent::Behaviour(behaviour::Event::Identify(identify::Event::Sent { + .. + })) => { + tracing::info!("Told Bootstrap Node our public address."); + told_relay_observed_addr = true; + }, + SwarmEvent::Behaviour(Event::Identify( + identify::Event::Received { + info ,peer_id + }, + )) => { + tracing::info!("Bootstrap Node told us our public address: {:?}", info.observed_addr); + learned_observed_addr = true; + relay_peer_id.replace(peer_id); + }, + event => tracing::info!("{:?}", event), + } + if learned_observed_addr && told_relay_observed_addr { + //success connect to bootstrap node + tracing::info!("Dial bootstrap node successfully"); + if let Some(bootstrap_node_addr) = client_paras.bootstrap_node_addr.clone(){ + let public_addr = bootstrap_node_addr.with(multiaddr::Protocol::P2pCircuit); + swarm.listen_on(public_addr.clone()).unwrap(); + swarm.add_external_address(public_addr); + //register rendezvous + if let Err(error) = swarm.behaviour_mut().rendezvous.register( + rendezvous::Namespace::from_static(event_handler::NAMESPACE), + relay_peer_id.unwrap(), + None, + ){ + tracing::error!("Failed to register: {error}"); + } + } + break; + } + } + _ = delay => { + tracing::error!("Dial bootstrap node failed: Timeout"); + break; + } + } + } + }); + } + + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); + block_on(async { + loop { + futures::select! { + line = stdin.select_next_some() => { + let line :String = line.expect("Stdin not to close"); + if line.is_empty() { + continue; + } + //kad input + input_command::handle_input_command(&mut swarm,&mut client_paras,line.to_string()).await; + }, + event = swarm.select_next_some() => { + match event{ + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on {:?}", address); + } + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + tracing::info!("Established connection to {:?} via {:?}", peer_id, endpoint); + swarm.behaviour_mut().kademlia.add_address(&peer_id,endpoint.get_remote_address().clone()); + let peers = swarm.connected_peers(); + for p in peers { + tracing::info!("Connected peer: {}",p); + }; + }, + SwarmEvent::ConnectionClosed { peer_id, .. } => { + tracing::info!("Disconnect {:?}", peer_id); + }, + SwarmEvent::OutgoingConnectionError{error,..} => { + tracing::debug!("OutgoingConnectionError {:?}", error); + }, + //Identify events + SwarmEvent::Behaviour(Event::Identify(event)) => { + event_handler::identify_event_handler(&mut swarm.behaviour_mut().kademlia, event); + }, + //RendezvousClient events + SwarmEvent::Behaviour(Event::Rendezvous(event)) => { + event_handler::rendezvous_client_event_handler(&mut swarm, &mut client_paras, event); + }, + //kad events + SwarmEvent::Behaviour(Event::Kademlia(event)) => { + event_handler::kad_event_handler(&mut swarm, &mut client_paras, event); + }, + //GitUploadPack events + SwarmEvent::Behaviour(Event::GitUploadPack(event)) => { + event_handler::git_upload_pack_event_handler(&mut swarm, &mut client_paras, event).await; + }, + //GitInfoRefs events + SwarmEvent::Behaviour(Event::GitInfoRefs(event)) => { + event_handler::git_info_refs_event_handler(&mut swarm, &mut client_paras, event).await; + }, + //GitObject events + SwarmEvent::Behaviour(Event::GitObject(event)) => { + event_handler::git_object_event_handler(&mut swarm, &mut client_paras, event).await; + }, + _ => { + tracing::debug!("Event: {:?}", event); + } + }; + } + } + } + }); + + Ok(()) +} diff --git a/gitp2p-rs/p2p/src/node/input_command.rs b/gitp2p-rs/p2p/src/node/input_command.rs new file mode 100644 index 0000000000000000000000000000000000000000..4f8c68dfb2349871258467f34023fab7dd82faef --- /dev/null +++ b/gitp2p-rs/p2p/src/node/input_command.rs @@ -0,0 +1,303 @@ +use super::ClientParas; +use crate::network::behaviour; +use crate::network::behaviour::{GitInfoRefsReq, GitUploadPackReq}; +use crate::node::{get_utc_timestamp, MegaRepoInfo}; +use crate::{get_pack_protocol, get_repo_full_path}; +use common::utils; +use libp2p::kad::record::Key; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::{Kademlia, Quorum, Record}; +use libp2p::{PeerId, Swarm}; +use std::collections::HashSet; +use std::path::Path; +use std::str::FromStr; + +pub async fn handle_input_command( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + line: String, +) { + let line = line.trim(); + if line.is_empty() { + return; + } + let mut args = line.split_whitespace(); + match args.next() { + Some("kad") => { + handle_kad_command(&mut swarm.behaviour_mut().kademlia, args.collect()); + } + Some("mega") => { + handle_mega_command(swarm, client_paras, args.collect()).await; + } + _ => { + eprintln!("expected command: kad, mega"); + } + } +} + +pub fn handle_kad_command(kademlia: &mut Kademlia, args: Vec<&str>) { + let mut args_iter = args.iter().copied(); + match args_iter.next() { + Some("get") => { + let key = { + match args_iter.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } + } + }; + kademlia.get_record(key); + } + Some("put") => { + let key = { + match args_iter.next() { + Some(key) => Key::new(&key), + None => { + eprintln!("Expected key"); + return; + } + } + }; + let value = { + match args_iter.next() { + Some(value) => value.as_bytes().to_vec(), + None => { + eprintln!("Expected value"); + return; + } + } + }; + let record = Record { + key, + value, + publisher: None, + expires: None, + }; + if let Err(e) = kademlia.put_record(record, Quorum::One) { + eprintln!("Put record failed :{}", e); + } + } + Some("k_buckets") => { + for (_, k_bucket_ref) in kademlia.kbuckets().enumerate() { + println!("k_bucket_ref.num_entries:{}", k_bucket_ref.num_entries()); + for (_, x) in k_bucket_ref.iter().enumerate() { + println!( + "PEERS[{:?}]={:?}", + x.node.key.preimage().to_string(), + x.node.value + ); + } + } + } + Some("get_peer") => { + let peer_id = match parse_peer_id(args_iter.next()) { + Some(peer_id) => peer_id, + None => { + return; + } + }; + kademlia.get_closest_peers(peer_id); + } + _ => { + eprintln!("expected command: get, put, k_buckets, get_peer"); + } + } +} + +pub async fn handle_mega_command( + swarm: &mut Swarm, + client_paras: &mut ClientParas, + args: Vec<&str>, +) { + let mut args_iter = args.iter().copied(); + match args_iter.next() { + //mega provide ${your_repo}.git + Some("provide") => { + let repo_name = { + match args_iter.next() { + Some(path) => path.to_string(), + None => { + eprintln!("Expected repo_name"); + return; + } + } + }; + if !repo_name.ends_with(".git") { + eprintln!("repo_name should end with .git"); + return; + } + let path = get_repo_full_path(&repo_name); + let pack_protocol = get_pack_protocol(&path, client_paras.storage.clone()).await; + let object_id = pack_protocol.get_head_object_id(Path::new(&path)).await; + if object_id == *utils::ZERO_ID { + eprintln!("Repository not found"); + return; + } + //Construct repoInfo + let mega_repo_info = MegaRepoInfo { + origin: swarm.local_peer_id().to_string(), + name: repo_name.clone(), + latest: object_id, + forks: vec![], + timestamp: get_utc_timestamp(), + }; + + let record = Record { + key: Key::new(&repo_name), + value: serde_json::to_vec(&mega_repo_info).unwrap(), + publisher: None, + expires: None, + }; + if let Err(e) = swarm + .behaviour_mut() + .kademlia + .put_record(record, Quorum::One) + { + eprintln!("Failed to store record:{}", e); + } + } + Some("search") => { + let repo_name = { + match args_iter.next() { + Some(path) => path.to_string(), + None => { + eprintln!("Expected repo_name"); + return; + } + } + }; + swarm + .behaviour_mut() + .kademlia + .get_record(Key::new(&repo_name)); + } + Some("clone") => { + // mega clone p2p://12D3KooWFgpUQa9WnTztcvs5LLMJmwsMoGZcrTHdt9LKYKpM4MiK/abc.git + let mega_address = { + match args_iter.next() { + Some(key) => key, + None => { + eprintln!("Expected mega_address"); + return; + } + } + }; + let (peer_id, repo_name) = match parse_mega_address(mega_address) { + Ok((peer_id, repo_name)) => (peer_id, repo_name), + Err(e) => { + eprintln!("{}", e); + return; + } + }; + let path = get_repo_full_path(repo_name); + let request_file_id = swarm.behaviour_mut().git_upload_pack.send_request( + &peer_id, + GitUploadPackReq(HashSet::new(), HashSet::new(), path), + ); + client_paras + .pending_git_upload_package + .insert(request_file_id, repo_name.to_string()); + } + Some("pull") => { + // mega pull p2p://12D3KooWFgpUQa9WnTztcvs5LLMJmwsMoGZcrTHdt9LKYKpM4MiK/abc.git + let mega_address = { + match args_iter.next() { + Some(key) => key, + None => { + eprintln!("Expected mega_address"); + return; + } + } + }; + let (peer_id, repo_name) = match parse_mega_address(mega_address) { + Ok((peer_id, repo_name)) => (peer_id, repo_name), + Err(e) => { + eprintln!("{}", e); + return; + } + }; + let path = get_repo_full_path(repo_name); + let pack_protocol = get_pack_protocol(&path, client_paras.storage.clone()).await; + let object_id = pack_protocol.get_head_object_id(Path::new(&path)).await; + if object_id == *utils::ZERO_ID { + eprintln!("local repo not found"); + return; + } + // Request to get git_info_refs + let request_id = swarm + .behaviour_mut() + .git_info_refs + .send_request(&peer_id, GitInfoRefsReq(path)); + client_paras + .pending_git_pull + .insert(request_id, repo_name.to_string()); + } + Some("clone-object") => { + // mega git_obj_download mega_test.git + let repo_name = { + match args_iter.next() { + Some(path) => path.to_string(), + None => { + eprintln!("Expected repo_name"); + return; + } + } + }; + if !repo_name.ends_with(".git") { + eprintln!("repo_name should end with .git"); + return; + } + + let kad_query_id = swarm + .behaviour_mut() + .kademlia + .get_record(Key::new(&repo_name)); + client_paras + .pending_repo_info_search_to_download_obj + .insert(kad_query_id, repo_name); + + // let path = get_repo_full_path(repo_name); + // let request_file_id = swarm + // .behaviour_mut() + // .git_info_refs + // .send_request(&peer_id, GitInfoRefsReq(path)); + // client_paras + // .pending_git_obj_download + // .insert(request_file_id, repo_name.to_string()); + } + _ => { + eprintln!("expected command: clone, pull, provide, clone-object"); + } + } +} + +fn parse_peer_id(peer_id_str: Option<&str>) -> Option { + match peer_id_str { + Some(peer_id) => match PeerId::from_str(peer_id) { + Ok(id) => Some(id), + Err(err) => { + eprintln!("peer_id parse error:{}", err); + None + } + }, + None => { + eprintln!("Expected peer_id"); + None + } + } +} + +fn parse_mega_address(mega_address: &str) -> Result<(PeerId, &str), String> { + // p2p://12D3KooWFgpUQa9WnTztcvs5LLMJmwsMoGZcrTHdt9LKYKpM4MiK/abc.git + let v: Vec<&str> = mega_address.split('/').collect(); + if v.len() < 4 { + return Err("mega_address invalid".to_string()); + }; + let peer_id = match PeerId::from_str(v[2]) { + Ok(peer_id) => peer_id, + Err(e) => return Err(e.to_string()), + }; + Ok((peer_id, v[3])) +} diff --git a/gitp2p-rs/p2p/src/node/mod.rs b/gitp2p-rs/p2p/src/node/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..dc03a0ec8267927e9077b21b48f86cede2116d4c --- /dev/null +++ b/gitp2p-rs/p2p/src/node/mod.rs @@ -0,0 +1,64 @@ +//! +//! +//! +//! +//! +//! + +use database::driver::ObjectStorage; +use entity::git_obj::Model; +use libp2p::kad::QueryId; +use libp2p::rendezvous::Cookie; +use libp2p::request_response::RequestId; +use libp2p::{Multiaddr, PeerId}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub mod client; +mod input_command; +pub mod relay_server; + +#[cfg(test)] +mod tests {} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct MegaRepoInfo { + pub origin: String, + pub name: String, + pub latest: String, + pub forks: Vec, + pub timestamp: i64, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct Fork { + pub peer: String, + pub latest: String, + pub timestamp: i64, +} + +pub struct ClientParas { + pub cookie: Option, + pub rendezvous_point: Option, + pub bootstrap_node_addr: Option, + pub storage: Arc, + pub pending_git_upload_package: HashMap, + pub pending_git_pull: HashMap, + pub pending_git_obj_download: HashMap, + pub pending_repo_info_update_fork: HashMap, + pub pending_repo_info_search_to_download_obj: HashMap, + pub pending_git_obj_id_download: HashMap, + pub repo_node_list: HashMap>, + pub repo_id_need_list: Arc>>>, + // pub repo_receive_git_obj_model_list: HashMap>, + pub repo_receive_git_obj_model_list: Arc>>>, +} + +pub fn get_utc_timestamp() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64 +} diff --git a/gitp2p-rs/p2p/src/node/relay_server.rs b/gitp2p-rs/p2p/src/node/relay_server.rs new file mode 100644 index 0000000000000000000000000000000000000000..281fb13aa580825acea504a31afda7557df00cb0 --- /dev/null +++ b/gitp2p-rs/p2p/src/node/relay_server.rs @@ -0,0 +1,150 @@ +use super::input_command; +use crate::network::event_handler; +use async_std::io; +use async_std::io::prelude::BufReadExt; +use futures::executor::block_on; +use futures::stream::StreamExt; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::{AddProviderOk, GetClosestPeersOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult}; +use libp2p::{ + core::upgrade, + core::Transport, + identify, identity, + identity::PeerId, + noise, relay, rendezvous, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + tcp, +}; +use std::error::Error; + +pub fn run(local_key: identity::Keypair, p2p_address: String) -> Result<(), Box> { + let local_peer_id = PeerId::from(local_key.public()); + tracing::info!("Local peer id: {local_peer_id:?}"); + + let tcp_transport = tcp::async_io::Transport::default(); + + let tcp_transport = tcp_transport + .upgrade(upgrade::Version::V1Lazy) + // .authenticate(tls::Config::new(&local_key).unwrap()) + .authenticate(noise::Config::new(&local_key)?) + .multiplex(libp2p::yamux::Config::default()) + .boxed(); + + let store = MemoryStore::new(local_peer_id); + + let behaviour = ServerBehaviour { + relay: relay::Behaviour::new(local_peer_id, Default::default()), + identify: identify::Behaviour::new(identify::Config::new( + "/mega/0.0.1".to_string(), + local_key.public(), + )), + kademlia: Kademlia::new(local_peer_id, store), + rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), + }; + + let mut swarm = SwarmBuilder::without_executor(tcp_transport, behaviour, local_peer_id).build(); + + // Listen on all interfaces + swarm.listen_on(p2p_address.parse()?)?; + + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); + + block_on(async { + loop { + futures::select! { + line = stdin.select_next_some() => { + let line :String = line.expect("Stdin not to close"); + if line.is_empty() { + continue; + } + //kad input + input_command::handle_kad_command(&mut swarm.behaviour_mut().kademlia,line.to_string().split_whitespace().collect()); + }, + event = swarm.select_next_some() => match event{ + SwarmEvent::Behaviour(ServerBehaviourEvent::Identify(identify::Event::Received { + info,peer_id + })) => { + swarm.add_external_address(info.observed_addr.clone()); + for listen_addr in info.listen_addrs.clone(){ + swarm.behaviour_mut().kademlia.add_address(&peer_id.clone(),listen_addr); + } + tracing::info!("Identify Event Received, peer_id :{}, info:{:?}", peer_id, info); + } + SwarmEvent::NewListenAddr { address, .. } => { + tracing::info!("Listening on {address:?}"); + } + //kad events + SwarmEvent::Behaviour(ServerBehaviourEvent::Kademlia(event)) => { + kad_event_handler(event); + } + //RendezvousServer events + SwarmEvent::Behaviour(ServerBehaviourEvent::Rendezvous(event)) => { + event_handler::rendezvous_server_event_handler(event); + }, + _ => { + tracing::debug!("Event: {:?}", event); + } + } + } + } + }); + + Ok(()) +} + +pub fn kad_event_handler(event: KademliaEvent) { + if let KademliaEvent::OutboundQueryProgressed { result, .. } = event { + match result { + QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(PeerRecord { record, peer }))) => { + let peer_id = match peer { + Some(id) => id.to_string(), + None => "local".to_string(), + }; + tracing::info!( + "Got record key[{}]={},from {}", + String::from_utf8(record.key.to_vec()).unwrap(), + String::from_utf8(record.value).unwrap(), + peer_id + ); + } + QueryResult::GetRecord(Err(err)) => { + tracing::error!("Failed to get record: {err:?}"); + } + QueryResult::PutRecord(Ok(PutRecordOk { key })) => { + tracing::info!( + "Successfully put record {:?}", + std::str::from_utf8(key.as_ref()).unwrap() + ); + } + QueryResult::PutRecord(Err(err)) => { + tracing::error!("Failed to put record: {err:?}"); + } + QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { peers, .. })) => { + for x in peers { + tracing::info!("{}", x); + } + } + QueryResult::GetClosestPeers(Err(err)) => { + tracing::error!("Failed to get closest peers: {err:?}"); + } + QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { providers, .. }), ..) => { + tracing::info!("FoundProviders: {providers:?}"); + } + QueryResult::GetProviders(Err(e)) => { + tracing::error!("GetProviders error: {e:?}"); + } + QueryResult::StartProviding(Ok(AddProviderOk { key, .. }), ..) => { + tracing::info!("StartProviding: {key:?}"); + } + _ => {} + } + } +} + +#[derive(NetworkBehaviour)] +pub struct ServerBehaviour { + pub relay: relay::Behaviour, + pub identify: identify::Behaviour, + pub kademlia: Kademlia, + pub rendezvous: rendezvous::server::Behaviour, +} diff --git a/gitp2p-rs/p2p/src/peer.rs b/gitp2p-rs/p2p/src/peer.rs new file mode 100644 index 0000000000000000000000000000000000000000..14ad6c9ee6cd93a8c9769319efb60b7fa366d96e --- /dev/null +++ b/gitp2p-rs/p2p/src/peer.rs @@ -0,0 +1,69 @@ +//! +//! +//! +//! +//! + +use super::node::client; +use super::node::relay_server; +use clap::Args; +use database::DataSource; +use libp2p::identity; + +/// Parameters for starting the p2p service +#[derive(Args, Clone, Debug)] +pub struct P2pOptions { + #[arg(long, default_value_t = String::from("127.0.0.1"))] + pub host: String, + + #[arg(short, long, default_value_t = 8001)] + pub port: u16, + + #[arg(short, long, default_value_t = String::from(""))] + pub bootstrap_node: String, + + #[arg(short, long)] + pub secret_key_seed: Option, + + #[arg(short, long, default_value_t = false)] + pub relay_server: bool, + + #[arg(short, long, value_enum, default_value = "postgres")] + pub data_source: DataSource, +} + +/// run as a p2p node +pub async fn run(options: &P2pOptions) -> Result<(), Box> { + let P2pOptions { + host, + port, + bootstrap_node, + secret_key_seed, + relay_server, + data_source, + } = options; + let p2p_address = format!("/ip4/{}/tcp/{}", host, port); + + // Create a PeerId. + let local_key = if let Some(secret_key_seed) = secret_key_seed { + tracing::info!("Generate keys with fix seed={}", secret_key_seed); + generate_ed25519_fix(*secret_key_seed) + } else { + tracing::info!("Generate keys randomly"); + identity::Keypair::generate_ed25519() + }; + + if *relay_server { + relay_server::run(local_key, p2p_address)?; + } else { + client::run(local_key, p2p_address, bootstrap_node.clone(), *data_source).await?; + } + Ok(()) +} + +fn generate_ed25519_fix(secret_key_seed: u8) -> identity::Keypair { + let mut bytes = [0u8; 32]; + bytes[0] = secret_key_seed; + + identity::Keypair::ed25519_from_bytes(bytes).expect("only errors on wrong length") +}