diff --git a/lfs_test_server_rust/.env b/lfs_test_server_rust/.env new file mode 100644 index 0000000000000000000000000000000000000000..793db398e1da8be8a5d77c700687e5850fcfe38c --- /dev/null +++ b/lfs_test_server_rust/.env @@ -0,0 +1,3 @@ +HOST=127.0.0.1 +PORT=9999 +DATABASE_URL="mysql://root:heruoqing@127.0.0.1:3333/test" diff --git a/lfs_test_server_rust/Cargo.toml b/lfs_test_server_rust/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..b1b502a20e33fd6e0311260cf7242b43f1b756ab --- /dev/null +++ b/lfs_test_server_rust/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "gust_lfs_server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.27.0", features = ["full"] } +anyhow = "1.0.69" +axum = { version = "0.6.16", features = ["multipart"] } +bytes = "1.4.0" +dotenvy = "0.15.6" +futures-util = "0.3.28" +sea-orm = { version = "0.11.2", features = [ + "sqlx-mysql", + "runtime-tokio-rustls", + "macros", +] } +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +sha256 = "1.1.3" +tracing = "0.1" +tower-http = { version = "0.4.0", features = ["full"] } +rand = "0.8.5" diff --git a/lfs_test_server_rust/src/config.rs b/lfs_test_server_rust/src/config.rs new file mode 100644 index 0000000000000000000000000000000000000000..7849bb9ec760a13e50163c8f6cea127dc311a9d0 --- /dev/null +++ b/lfs_test_server_rust/src/config.rs @@ -0,0 +1,196 @@ +use std::env; + +const KEY_PREFIX: &str = "LFS"; + +#[derive(Debug)] +pub struct Configuration { + pub listen: String, + pub host: String, + pub ext_origin: String, + pub meta_db: String, + pub content_path: String, + pub admin_user: String, + pub admin_pass: String, + pub cert: String, + pub key: String, + pub scheme: String, + pub public: String, + pub use_tus: String, + pub tus_host: String, +} + +impl Configuration { + pub fn is_https(&self) -> bool { + self.scheme.contains("https") + } + + pub fn is_public(&self) -> bool { + match self.public.as_str() { + "1" | "true" | "TRUE" => true, + _ => false, + } + } + + pub fn is_using_tus(&self) -> bool { + match self.use_tus.as_str() { + "1" | "true" | "TRUE" => true, + _ => false, + } + } + + pub fn init() -> Configuration { + let mut config = Configuration { + listen: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "tcp://:8080".to_string(); + } + let port = match env::var("PORT") { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if port != "" { + env = format!("tcp://:{}", port); + } + env + }, + host: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "localhost:8080".to_string(); + } + env + }, + ext_origin: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + env + }, + meta_db: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "lfs.db".to_string(); + } + env + }, + content_path: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "lfs-content".to_string(); + } + env.to_owned() + }, + admin_user: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + env + }, + admin_pass: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + env + }, + cert: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + env + }, + key: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + env + }, + scheme: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "http".to_string(); + } + env + }, + public: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "public".to_string(); + } + env + }, + use_tus: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "false".to_string(); + } + env + }, + tus_host: { + let env_var = format!("{}_{}", KEY_PREFIX, ""); + let mut env = match env::var(env_var) { + Ok(val) => val, + Err(_) => "".to_string(), + }; + if env == "" { + env = "localhost:1080".to_string(); + } + env + }, + }; + let env = format!("{}://{}", config.scheme, config.host); + + config.ext_origin = env; + + config + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_init() { + let config = Configuration::init(); + println!("{:?}", config); + } +} diff --git a/lfs_test_server_rust/src/content_store.rs b/lfs_test_server_rust/src/content_store.rs new file mode 100644 index 0000000000000000000000000000000000000000..03ddced7a3d6e982785e41ed9fd520d61da4df3a --- /dev/null +++ b/lfs_test_server_rust/src/content_store.rs @@ -0,0 +1,86 @@ +use crate::server::MetaObject; +use sha256::digest; +use std::fs; +use std::io::prelude::*; +use std::path; + +pub struct ContentStore { + base_path: String, +} + +impl ContentStore { + pub async fn new(base: String) -> ContentStore { + fs::create_dir_all(&base).expect("Create directory failed!"); + ContentStore { base_path: base } + } + + pub async fn get(&self, meta: &MetaObject, start: i64) -> fs::File { + let path = path::Path::new(&self.base_path).join(transform_key(meta.oid.to_owned())); + + let mut file = fs::File::open(&path).expect("Open file failed!"); + if start > 0 { + file.seek(std::io::SeekFrom::Start(start as u64)) + .expect("Shift file pointer failed"); + } + + file + } + + pub async fn put(&self, meta: &MetaObject, body_content: &[u8]) -> bool { + let path = path::Path::new(&self.base_path).join(transform_key(meta.oid.to_owned())); + let dir = path.parent().unwrap(); + fs::create_dir_all(&dir).expect("Create directory failed!"); + + let mut file = fs::File::create(&path).expect("Open file failed"); + let lenght_written = file.write(body_content).expect("Write file failed"); + if lenght_written as i64 != meta.size { + return false; + } + + let hash = digest(body_content); + if hash != meta.oid { + return false; + } + true + } + + pub async fn exist(&self, meta: &MetaObject) -> bool { + let path = path::Path::new(&self.base_path).join(transform_key(meta.oid.to_owned())); + + path::Path::exists(&path) + } +} + +fn transform_key(key: String) -> String { + if key.len() < 5 { + key + } else { + path::Path::new(&key[0..2]) + .join(&key[2..4]) + .join(&key[4..key.len()]) + .into_os_string() + .into_string() + .unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_content_store() { + let meta = MetaObject { + oid: "6ae8a75555209fd6c44157c0aed8016e763ff435a19cf186f76863140143ff72".to_owned(), + size: 12, + exist: false, + }; + + let content = "test content".as_bytes(); + + let content_store = ContentStore::new("content-store".to_owned()).await; + assert!(content_store.put(&meta, content).await); + + assert!(content_store.exist(&meta).await); + } +} diff --git a/lfs_test_server_rust/src/database/entity/locks.rs b/lfs_test_server_rust/src/database/entity/locks.rs new file mode 100644 index 0000000000000000000000000000000000000000..53cb092a045f375fcf9c63c19b90d80a6d77414e --- /dev/null +++ b/lfs_test_server_rust/src/database/entity/locks.rs @@ -0,0 +1,16 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "locks")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: String, + pub data: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/lfs_test_server_rust/src/database/entity/meta.rs b/lfs_test_server_rust/src/database/entity/meta.rs new file mode 100644 index 0000000000000000000000000000000000000000..9b710694e981a387e138c25221de6d28699044ad --- /dev/null +++ b/lfs_test_server_rust/src/database/entity/meta.rs @@ -0,0 +1,17 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "meta")] +pub struct Model { + #[sea_orm(primary_key)] + pub oid: String, + pub size: i64, + pub exist: bool, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/lfs_test_server_rust/src/database/entity/mod.rs b/lfs_test_server_rust/src/database/entity/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..8ed61e438e4dc5a622de49b28a482ba0b89d795b --- /dev/null +++ b/lfs_test_server_rust/src/database/entity/mod.rs @@ -0,0 +1,4 @@ +pub mod prelude; + +pub mod locks; +pub mod meta; diff --git a/lfs_test_server_rust/src/database/entity/prelude.rs b/lfs_test_server_rust/src/database/entity/prelude.rs new file mode 100644 index 0000000000000000000000000000000000000000..4c046e4065a2af890583daddeb1a023ac8c0afa7 --- /dev/null +++ b/lfs_test_server_rust/src/database/entity/prelude.rs @@ -0,0 +1,2 @@ +pub use super::locks::Entity as Lock; +pub use super::meta::Entity as Metadata; diff --git a/lfs_test_server_rust/src/database/mod.rs b/lfs_test_server_rust/src/database/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..7f76a4e53c06c0640d7e56fe7960de3fa615b34f --- /dev/null +++ b/lfs_test_server_rust/src/database/mod.rs @@ -0,0 +1,9 @@ +mod entity; +pub mod mysql; + +use sea_orm::DatabaseConnection; + +#[derive(Clone)] +pub struct DataSource { + pub sea_orm: DatabaseConnection, +} diff --git a/lfs_test_server_rust/src/database/mysql/meta_storage.rs b/lfs_test_server_rust/src/database/mysql/meta_storage.rs new file mode 100644 index 0000000000000000000000000000000000000000..b7df377bdf626ee89a9192dac6f0f8f99d82d93a --- /dev/null +++ b/lfs_test_server_rust/src/database/mysql/meta_storage.rs @@ -0,0 +1,389 @@ +use crate::database::entity::locks::{self}; +use crate::database::entity::meta::{self}; +use crate::server::{Lock, MetaObject, RequestVars, User}; +use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; +use std::cmp::min; + +#[derive(Debug, Default, Clone)] +pub struct MysqlStorage { + pub connection: DatabaseConnection, +} + +impl MysqlStorage { + pub fn new(connection: DatabaseConnection) -> MysqlStorage { + MysqlStorage { connection } + } + + pub async fn get(&self, v: &RequestVars) -> Option { + let result = meta::Entity::find_by_id(v.oid.clone()) + .one(&self.connection) + .await + .unwrap(); + + match result { + Some(val) => Some(MetaObject { + oid: val.oid, + size: val.size, + exist: val.exist, + }), + None => None, + } + } + + pub async fn put(&self, v: &RequestVars) -> MetaObject { + // Check if already exist. + let result = meta::Entity::find_by_id(v.oid.clone()) + .one(&self.connection) + .await + .unwrap(); + if result.is_some() { + let result = result.unwrap(); + return MetaObject { + oid: result.oid, + size: result.size, + exist: true, + }; + } + + // Put into database if not exist. + let meta = MetaObject { + oid: v.oid.to_string(), + size: v.size, + exist: false, + }; + + let meta_to = meta::ActiveModel { + oid: Set(meta.oid.to_owned()), + size: Set(meta.size.to_owned()), + exist: Set(false), + }; + + let res = meta::Entity::insert(meta_to).exec(&self.connection).await; + assert!(res.is_ok()); + + meta + } + + pub async fn delete(&self, v: &RequestVars) -> bool { + let res = meta::Entity::delete_by_id(v.oid.to_owned()) + .exec(&self.connection) + .await; + if res.is_ok() { + true + } else { + false + } + } + + pub async fn locks(&self, repo: &String) -> Vec { + let result = locks::Entity::find_by_id(repo.to_owned()) + .one(&self.connection) + .await + .unwrap(); + + match result { + Some(val) => { + let data = val.data.to_owned(); + let locks: Vec = serde_json::from_str(&data).unwrap(); + locks + } + None => { + vec![] + } + } + } + + pub async fn add_locks(&self, repo: &String, locks: Vec) -> bool { + let result = locks::Entity::find_by_id(repo.to_owned()) + .one(&self.connection) + .await + .unwrap(); + + match result { + // Update + Some(val) => { + let d = val.data.to_owned(); + let mut locks_from_data = if d != "" { + let locks_from_data: Vec = serde_json::from_str(&d).unwrap(); + locks_from_data + } else { + vec![] + }; + let mut locks = locks; + locks_from_data.append(&mut locks); + + locks_from_data.sort_by(|a, b| { + a.locked_at + .partial_cmp(&b.locked_at) + .unwrap_or(std::cmp::Ordering::Equal) + }); + let d = serde_json::to_string(&locks_from_data).unwrap(); + + let mut lock_to: locks::ActiveModel = val.into(); + lock_to.data = Set(d.to_owned()); + let res = lock_to.update(&self.connection).await; + res.is_ok() + } + // Insert + None => { + let mut locks = locks; + locks.sort_by(|a, b| { + a.locked_at + .partial_cmp(&b.locked_at) + .unwrap_or(std::cmp::Ordering::Equal) + }); + let data = serde_json::to_string(&locks).unwrap(); + let lock_to = locks::ActiveModel { + id: Set(repo.to_owned()), + data: Set(data.to_owned()), + }; + let res = locks::Entity::insert(lock_to).exec(&self.connection).await; + res.is_ok() + } + } + } + + pub async fn filterd_locks( + &self, + repo: &String, + path: &String, + cursor: &String, + limit: &String, + ) -> (Vec, String, bool) { + let mut locks = self.locks(&repo.to_owned()).await; + + if cursor != "" { + let mut last_seen = -1; + for (i, v) in locks.iter().enumerate() { + if v.id == *cursor { + last_seen = i as i32; + break; + } + } + + if last_seen > -1 { + locks = locks.split_off(last_seen as usize); + } else { + // Cursor not found. + return (vec![], "".to_string(), false); + } + } + + if path != "" { + let mut filterd = Vec::::new(); + for lock in locks.iter() { + if lock.path == *path { + filterd.push(Lock { + id: lock.id.to_owned(), + path: lock.path.to_owned(), + owner: User { + name: lock.owner.name.to_owned(), + }, + locked_at: lock.locked_at, + }); + } + } + locks = filterd; + } + + let mut next = "".to_string(); + if limit != "" { + let mut size = limit.parse::().unwrap(); + size = min(size, locks.len() as i64); + + if size + 1 < locks.len() as i64 { + next = locks[size as usize].id.to_owned(); + } + let _ = locks.split_off(size as usize); + } + + (locks, next, true) + } + + pub async fn delete_lock( + &self, + repo: &String, + user: &String, + id: &String, + force: bool, + ) -> (Lock, bool) { + let empty_lock = Lock { + id: "".to_owned(), + path: "".to_owned(), + owner: User { + name: "".to_owned(), + }, + locked_at: 0 as f64, + }; + let result = locks::Entity::find_by_id(repo.to_owned()) + .one(&self.connection) + .await + .unwrap(); + + match result { + // Exist, then delete. + Some(val) => { + let d = val.data.to_owned(); + let locks_from_data = if d != "" { + let locks_from_data: Vec = serde_json::from_str(&d).unwrap(); + locks_from_data + } else { + vec![] + }; + + let mut new_locks = Vec::::new(); + let mut lock_to_delete = Lock { + id: "".to_owned(), + path: "".to_owned(), + owner: User { + name: "".to_owned(), + }, + locked_at: 0 as f64, + }; + + for lock in locks_from_data.iter() { + if lock.id == *id { + if lock.owner.name != *user && !force { + return (empty_lock, false); + } + lock_to_delete.id = lock.id.to_owned(); + lock_to_delete.path = lock.path.to_owned(); + lock_to_delete.owner = User { + name: lock.owner.name.to_owned(), + }; + lock_to_delete.locked_at = lock.locked_at; + } else if lock.id.len() > 0 { + new_locks.push(Lock { + id: lock.id.to_owned(), + path: lock.path.to_owned(), + owner: User { + name: lock.owner.name.to_owned(), + }, + locked_at: lock.locked_at, + }); + } + } + if lock_to_delete.id == "" { + return (empty_lock, false); + } + + // No locks remains, delete the repo from database. + if new_locks.len() == 0 { + locks::Entity::delete_by_id(repo.to_owned()) + .exec(&self.connection) + .await + .unwrap(); + + return (lock_to_delete, true); + } + + // Update remaining locks. + let data = serde_json::to_string(&new_locks).unwrap(); + + let mut lock_to: locks::ActiveModel = val.into(); + lock_to.data = Set(data.to_owned()); + let res = lock_to.update(&self.connection).await; + (lock_to_delete, res.is_ok()) + } + // Not exist, error. + None => (empty_lock, false), + } + } +} + +#[cfg(test)] +mod tests { + // use super::*; + // use crate::database::mysql; + + // #[tokio::test] + // async fn test_get() { + // let db = mysql::init().await; + + // let request_vars = RequestVars { + // oid: "oid_for_test".to_string(), + // size: 77, + // user: "test_user".to_string(), + // password: "test_password".to_string(), + // repo: "test_repo".to_string(), + // authorization: "test_auth".to_string(), + // }; + // let select_res = db.get(&request_vars).await; + // println!("{:?}", select_res); + // } + + // #[tokio::test] + // async fn test_put() { + // let db = mysql::init().await; + + // let request_vars = RequestVars { + // oid: "6ae8a75555209fd6c44157c0aed8016e763ff435a19cf186f76863140143ff72".to_string(), + // size: 12, + // user: "test_user".to_string(), + // password: "test_password".to_string(), + // repo: "test_repo".to_string(), + // authorization: "test_auth".to_string(), + // }; + // let insert_res = db.put(&request_vars).await; + // println!("{:?}", insert_res); + // } + + // #[tokio::test] + // async fn test_delete() { + // let db = mysql::init().await; + + // let request_vars = RequestVars { + // oid: "oid_for_test".to_string(), + // size: 77, + // user: "test_user".to_string(), + // password: "test_password".to_string(), + // repo: "test_repo".to_string(), + // authorization: "test_auth".to_string(), + // }; + // let delete_res = db.delete(&request_vars).await; + // println!("{:?}", delete_res); + // } + + // fn new_test_lock(repo: String, path: String, user: String) -> Lock { + // Lock { + // id: repo.to_owned(), + // path: path.to_owned(), + // owner: User { + // name: user.to_owned(), + // }, + // locked_at: { + // let now = std::time::SystemTime::now(); + // let res = now.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs_f64(); + // res + // }, + // } + // } + + // #[tokio::test] + // async fn test_locks() { + // let test_locks = vec![ + // new_test_lock("test1".to_string(), "test_path1".to_string(), "test1_user".to_string()), + // new_test_lock("test2".to_string(), "test_path2".to_string(), "test2_user".to_string()), + // new_test_lock("test3".to_string(), "test_path3".to_string(), "test3_user".to_string()), + // ]; + + // let db = mysql::init().await; + + // db.add_locks(&"test_repo".to_string(), test_locks).await; + // } + + // #[tokio::test] + // async fn test_get_locks() { + // let db = mysql::init().await; + // let locks = db.locks(&"test_repo".to_string()).await; + + // println!("{:?}", locks); + // } + + // #[tokio::test] + // async fn test_delete_locks() { + // let db = mysql::init().await; + // let deleted = db.delete_lock(&"test_repo".to_string(), &"test1_user".to_string(), &"test1".to_string(), false).await; + // println!("{:?}", deleted); + // } +} diff --git a/lfs_test_server_rust/src/database/mysql/mod.rs b/lfs_test_server_rust/src/database/mysql/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..95e477c6c58a5faf926f81cd9c2ca1d37b735e86 --- /dev/null +++ b/lfs_test_server_rust/src/database/mysql/mod.rs @@ -0,0 +1,39 @@ +pub mod meta_storage; + +use std::env; +use std::time::Duration; + +use sea_orm::{ConnectOptions, Database}; +use tracing::log; + +use self::meta_storage::MysqlStorage; + +pub async fn init() -> MysqlStorage { + let db_url = env::var("DATABASE_URL").expect("DATABASE_URL is not set in .env file"); + let mut opt = ConnectOptions::new(db_url.to_owned()); + // max_connections is properly for double size of the cpu core + opt.max_connections(32) + .min_connections(8) + .acquire_timeout(Duration::from_secs(30)) + .connect_timeout(Duration::from_secs(20)) + .idle_timeout(Duration::from_secs(8)) + .max_lifetime(Duration::from_secs(8)) + .sqlx_logging(true) + .sqlx_logging_level(log::LevelFilter::Debug); + MysqlStorage::new( + Database::connect(opt) + .await + .expect("Database connection failed"), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_init() { + let db = init().await; + println!("Success: {:?}", db); + } +} diff --git a/lfs_test_server_rust/src/main.rs b/lfs_test_server_rust/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..ff02cf711179eb5d98798d5544a436ef15b7e3b1 --- /dev/null +++ b/lfs_test_server_rust/src/main.rs @@ -0,0 +1,14 @@ +pub mod config; +pub mod content_store; +pub mod database; +pub mod server; + +use anyhow::Result; + +#[tokio::main] +pub async fn main() -> Result<()> { + dotenvy::dotenv().ok(); + server::lfs_server().await.unwrap(); + + Ok(()) +} diff --git a/lfs_test_server_rust/src/server.rs b/lfs_test_server_rust/src/server.rs new file mode 100644 index 0000000000000000000000000000000000000000..6e1ce8585374dad2e3c48b54a273618b342df29c --- /dev/null +++ b/lfs_test_server_rust/src/server.rs @@ -0,0 +1,832 @@ +use crate::config::Configuration; +use crate::content_store::ContentStore; +use crate::database::mysql; +use anyhow::Result; +use axum::http::header::{ACCEPT, AUTHORIZATION}; +use bytes::{BufMut, BytesMut}; +use futures_util::StreamExt; +use rand::prelude::*; +use std::collections::HashMap; +use std::env; +use std::io::prelude::*; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::Mutex; + +use axum::body::Body; +use axum::extract::{BodyStream, Path, State}; +use axum::http::{header::HeaderMap, Response, StatusCode}; +use axum::routing::{get, post}; +use axum::{Router, Server}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct RequestVars { + pub oid: String, + pub size: i64, + pub user: String, + pub password: String, + pub repo: String, + pub authorization: String, +} + +impl RequestVars { + async fn download_link(&self, config: Arc>) -> String { + self.internal_link("objects".to_string(), config).await + } + + async fn upload_link(&self, config: Arc>) -> String { + self.internal_link("objects".to_string(), config).await + } + + async fn internal_link(&self, subpath: String, config: Arc>) -> String { + let mut path = PathBuf::new(); + + let user = &self.user; + if user.len() > 0 { + path.push(user); + } + + let repo = &self.repo; + if repo.len() > 0 { + path.push(repo); + } + + let config = config.lock().await; + path.push(&config.ext_origin); + + path.push(&subpath); + path.push(&self.oid); + + format!("{}", path.into_os_string().into_string().unwrap()) + } + + async fn verify_link(&self, config: Arc>) -> String { + let path = format!("/verify/{}", &self.oid); + let config = config.lock().await; + format!("{}{}", config.ext_origin, path) + } +} + +#[derive(Serialize, Deserialize)] +pub struct BatchVars { + pub transfers: Vec, + pub operation: String, + pub objects: Vec, +} + +#[derive(Debug)] +pub struct MetaObject { + pub oid: String, + pub size: i64, + pub exist: bool, +} + +#[derive(Serialize, Deserialize)] +pub struct BatchResponse { + pub transfer: String, + pub objects: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct Link { + pub href: String, + pub header: HashMap, + pub expires_at: f64, +} + +#[derive(Serialize, Deserialize)] +pub struct ObjectError { + pub code: i64, + pub message: String, +} + +#[derive(Serialize, Deserialize)] +pub struct Representation { + pub oid: String, + pub size: i64, + pub actions: HashMap, + pub error: ObjectError, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct User { + pub name: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Lock { + pub id: String, + pub path: String, + pub owner: User, + pub locked_at: f64, +} + +#[derive(Serialize, Deserialize)] +pub struct LockRequest { + pub path: String, +} + +#[derive(Serialize, Deserialize)] +pub struct LockResponse { + pub lock: Lock, + pub message: String, +} + +#[derive(Serialize, Deserialize)] +pub struct UnlockRequest { + pub force: bool, +} + +#[derive(Serialize, Deserialize)] +pub struct UnlockResponse { + pub lock: Lock, + pub message: String, +} + +#[derive(Serialize, Deserialize)] +pub struct LockList { + pub locks: Vec, + pub next_cursor: String, + pub message: String, +} + +#[derive(Serialize, Deserialize)] +pub struct VerifiableLockRequest { + pub cursor: String, + pub limit: i64, +} + +#[derive(Serialize, Deserialize)] +pub struct VerifiableLockList { + pub ours: Vec, + pub theirs: Vec, + pub next_cursor: String, + pub message: String, +} + +#[derive(Clone)] +struct AppState { + config: Arc>, + db_storage: Arc>, +} + +pub async fn lfs_server() -> Result<(), Box> { + // load env variables + let host = env::var("HOST").expect("HOST is not set in .env file"); + let port = env::var("PORT").expect("PORT is not set in .env file"); + let server_url = format!("{}:{}", host, port); + + let state = AppState { + config: Arc::new(Mutex::new(Configuration::init())), + db_storage: Arc::new(Mutex::new(mysql::init().await)), + }; + + let app = Router::new() + .route("/:user/:repo/objects/batch", get(batch_handler)) + .route( + "/:user/:repo/objects/:oid", + get(logged_method_router).put(put_handler), + ) + .route("/:user/:repo/objects", post(post_handler)) + .route( + "/:user/:repo/locks", + get(locks_handler).post(create_lock_handler), + ) + .route("/:user/:repo/locks/verify", post(locks_verify_handler)) + .route("/:user/:repo/locks/:id/unlock", post(delete_lock_handler)) + .with_state(state); + + let addr = SocketAddr::from_str(&server_url).unwrap(); + Server::bind(&addr).serve(app.into_make_service()).await?; + + Ok(()) +} + +async fn batch_handler( + state: State, + headers: HeaderMap, + Path((user, repo)): Path<(String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + // Extract the body to `BatchVars`. + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + let mut batch_vars: BatchVars = serde_json::from_slice(buffer.freeze().as_ref()).unwrap(); + + let auth = headers.get(AUTHORIZATION); + let auth = match auth { + Some(val) => val.to_str().unwrap(), + None => "", + }; + + let bvo = &mut batch_vars.objects; + for req in bvo { + req.user = user.to_string(); + req.repo = repo.to_string(); + req.authorization = auth.to_string(); + } + + let mut response_objects = Vec::::new(); + let mut use_tus = false; + let db = Arc::clone(&state.db_storage); + let config = Arc::clone(&state.config); + let config = config.lock().await; + + if batch_vars.operation == "upload" && config.is_using_tus() { + for tran in batch_vars.transfers { + if tran == "tus" { + use_tus = true; + break; + } + } + } + + let content_store = ContentStore::new(config.content_path.to_owned()).await; + let db = db.lock().await; + for object in batch_vars.objects { + let meta = db.get(&object).await; + let found = meta.is_some(); + let mut meta = meta.unwrap(); + if found && content_store.exist(&meta).await { + let conf = Arc::clone(&state.config); + response_objects.push(represent(&object, &meta, true, false, false, conf).await); + continue; + } + + // Not found + if batch_vars.operation == "upload" { + meta = db.put(&object).await; + let conf = Arc::clone(&state.config); + response_objects.push(represent(&object, &meta, false, true, use_tus, conf).await); + } else { + let rep = Representation { + oid: object.oid.to_owned(), + size: object.size, + actions: HashMap::new(), + error: ObjectError { + code: 404, + message: "Not found".to_owned(), + }, + }; + response_objects.push(rep); + } + } + + let mut batch_response = BatchResponse { + transfer: "".to_string(), + objects: response_objects, + }; + + if use_tus { + batch_response.transfer = "tus".to_string(); + } + + let json = serde_json::to_string(&batch_response).unwrap(); + let body = Body::from(json); + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs+json"); + + let resp = resp.body(body).unwrap(); + Ok(resp) +} + +async fn logged_method_router( + state: State, + headers: HeaderMap, + Path((user, repo, oid)): Path<(String, String, String)>, +) -> Result, (StatusCode, String)> { + let h = headers.get(ACCEPT); + let h = match h { + Some(val) => val.to_str().unwrap(), + None => "", + }; + + let auth = headers.get(AUTHORIZATION); + let auth = match auth { + Some(val) => val.to_str().unwrap(), + None => "", + }; + + let db = Arc::clone(&state.db_storage); + let config = Arc::clone(&state.config); + + // Load request parameters into struct. + let request_vars = RequestVars { + oid: oid.to_owned(), + size: 0, + user: user.to_owned(), + password: "".to_owned(), + repo: repo.to_owned(), + authorization: auth.to_owned(), + }; + + if h == "application/vnd.git-lfs" { + get_content_handler(request_vars, db, config).await + } else if h == "application/vnd.git-lfs+json" { + get_meta_handler(request_vars, db, config).await + } else { + Err(( + StatusCode::NOT_ACCEPTABLE, + String::from("Header not acceptable!"), + )) + } +} + +async fn get_content_handler( + request_vars: RequestVars, + db: Arc>, + config: Arc>, +) -> Result, (StatusCode, String)> { + let db = db.lock().await; + let meta = db.get(&request_vars).await.unwrap(); + + let content_store = ContentStore::new(config.lock().await.content_path.to_owned()).await; + let mut file = content_store.get(&meta, 0).await; + + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + let mut bytes = BytesMut::new(); + bytes.put(buffer.as_bytes()); + let resp = Response::builder(); + let body = Body::from(bytes.freeze()); + Ok(resp.body(body).unwrap()) +} + +async fn get_meta_handler( + request_vars: RequestVars, + db: Arc>, + config: Arc>, +) -> Result, (StatusCode, String)> { + let db = db.lock().await; + let meta = db.get(&request_vars).await.unwrap(); + + let resp = Response::builder(); + let rep = represent(&request_vars, &meta, true, false, false, config).await; + let json = serde_json::to_string(&rep).unwrap(); + let body = Body::from(json); + + Ok(resp.body(body).unwrap()) +} + +async fn put_handler( + state: State, + headers: HeaderMap, + Path((user, repo, oid)): Path<(String, String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + let auth = headers.get(AUTHORIZATION); + let auth = match auth { + Some(val) => val.to_str().unwrap(), + None => "", + }; + + // Load request parameters into struct. + let request_vars = RequestVars { + oid: oid.to_owned(), + size: 0, + user: user.to_owned(), + password: "".to_owned(), + repo: repo.to_owned(), + authorization: auth.to_owned(), + }; + + let db = Arc::clone(&state.db_storage); + let config = Arc::clone(&state.config); + + let db = db.lock().await; + let meta = db.get(&request_vars).await.unwrap(); + + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + + let content_store = ContentStore::new(config.lock().await.content_path.to_owned()).await; + let ok = content_store.put(&meta, buffer.freeze().as_ref()).await; + if !ok { + db.delete(&request_vars).await; + return Err(( + StatusCode::NOT_ACCEPTABLE, + String::from("Header not acceptable!"), + )); + } + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs"); + let resp = resp.body(Body::empty()).unwrap(); + + Ok(resp) +} + +async fn post_handler( + state: State, + headers: HeaderMap, + Path((user, repo)): Path<(String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + let auth = headers.get(AUTHORIZATION); + let auth = match auth { + Some(val) => val.to_str().unwrap(), + None => "", + }; + + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + + let mut request_vars: RequestVars = serde_json::from_slice(buffer.freeze().as_ref()).unwrap(); + + request_vars.user = user.to_string(); + request_vars.repo = repo.to_string(); + request_vars.authorization = auth.to_string(); + + let db = Arc::clone(&state.db_storage); + let config = Arc::clone(&state.config); + + let db = db.lock().await; + let meta = db.put(&request_vars).await; + + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs"); + resp = resp.status(202); + + let content_store = ContentStore::new(config.lock().await.content_path.to_owned()).await; + if meta.exist && content_store.exist(&meta).await { + resp = resp.status(200); + } + let rep = represent(&request_vars, &meta, meta.exist, true, false, config).await; + let json = serde_json::to_string(&rep).unwrap(); + let body = Body::from(json); + + let resp = resp.body(body).unwrap(); + Ok(resp) +} + +async fn locks_handler( + state: State, + Path((_, repo)): Path<(String, String)>, +) -> Result, (StatusCode, String)> { + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs+json"); + + let db = Arc::clone(&state.db_storage); + let db = db.lock().await; + let (locks, next_cursor, ok) = db + .filterd_locks(&repo, &"".to_string(), &"".to_string(), &"".to_string()) + .await; + + let mut lock_list = LockList { + locks: vec![], + next_cursor: "".to_string(), + message: "".to_string(), + }; + + if !ok { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Lookup operation failed!".to_string(), + )); + } else { + lock_list.locks = locks.clone(); + lock_list.next_cursor = next_cursor; + } + + let locks_response = serde_json::to_string(&lock_list).unwrap(); + let body = Body::from(locks_response); + + Ok(resp.body(body).unwrap()) +} + +async fn locks_verify_handler( + state: State, + Path((user, repo)): Path<(String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs+json"); + + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + + let verifiable_lock_request: VerifiableLockRequest = + serde_json::from_slice(buffer.freeze().as_ref()).unwrap(); + let mut limit = verifiable_lock_request.limit; + if limit == 0 { + limit = 100; + } + + let db = Arc::clone(&state.db_storage); + let db = db.lock().await; + let (locks, next_cursor, ok) = db + .filterd_locks( + &repo, + &"".to_string(), + &verifiable_lock_request.cursor, + &limit.to_string(), + ) + .await; + + let mut lock_list = VerifiableLockList { + ours: vec![], + theirs: vec![], + next_cursor: "".to_string(), + message: "".to_string(), + }; + + if !ok { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Lookup operation failed!".to_string(), + )); + } else { + lock_list.next_cursor = next_cursor; + + for lock in locks.iter() { + if lock.owner.name == user { + lock_list.ours.push(lock.clone()); + } else { + lock_list.theirs.push(lock.clone()); + } + } + } + + let locks_response = serde_json::to_string(&lock_list).unwrap(); + let body = Body::from(locks_response); + + Ok(resp.body(body).unwrap()) +} + +async fn create_lock_handler( + state: State, + Path((user, repo)): Path<(String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs+json"); + + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + + let lock_request: LockRequest = serde_json::from_slice(buffer.freeze().as_ref()).unwrap(); + + let db = Arc::clone(&state.db_storage); + let db = db.lock().await; + + let (locks, _, ok) = db + .filterd_locks( + &repo, + &lock_request.path.to_string(), + &"".to_string(), + &"1".to_string(), + ) + .await; + if !ok { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed when filtering locks!".to_string(), + )); + } + + if locks.len() > 0 { + return Err((StatusCode::CONFLICT, "Lock already exist".to_string())); + } + + let lock = Lock { + id: { + let mut random_num = String::new(); + let mut rng = rand::thread_rng(); + for _ in 0..8 { + random_num += &(rng.gen_range(0..9)).to_string(); + } + random_num + }, + path: lock_request.path.to_owned(), + owner: User { + name: user.to_owned(), + }, + locked_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs_f64(), + }; + + let ok = db.add_locks(&repo, vec![lock.clone()]).await; + if !ok { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed when adding locks!".to_string(), + )); + } + + resp = resp.status(StatusCode::CREATED); + + let lock_response = LockResponse { + lock, + message: "".to_string(), + }; + let lock_response = serde_json::to_string(&lock_response).unwrap(); + + let body = Body::from(lock_response); + Ok(resp.body(body).unwrap()) +} + +async fn delete_lock_handler( + state: State, + Path((user, repo, id)): Path<(String, String, String)>, + mut stream: BodyStream, +) -> Result, (StatusCode, String)> { + // Retrieve information from request body. + let mut resp = Response::builder(); + resp = resp.header("Content-Type", "application/vnd.git-lfs+json"); + + let mut buffer = BytesMut::new(); + while let Some(chunk) = stream.next().await { + buffer.put(chunk.unwrap()); + } + + if id.len() == 0 { + return Err((StatusCode::BAD_REQUEST, "Invalid lock id!".to_string())); + } + + if buffer.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + "Deserialize operation failed!".to_string(), + )); + } + let unlock_request: UnlockRequest = serde_json::from_slice(buffer.freeze().as_ref()).unwrap(); + + let db = Arc::clone(&state.db_storage); + let db = db.lock().await; + + let (deleted_lock, ok) = db + .delete_lock(&repo, &user, &id, unlock_request.force) + .await; + if !ok { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Delete operation failed!".to_string(), + )); + } + + if deleted_lock.id == "" + && deleted_lock.path == "" + && deleted_lock.owner.name == "" + && deleted_lock.locked_at as i64 == 0 + { + return Err((StatusCode::NOT_FOUND, "Unable to find lock!".to_string())); + } + + let unlock_response = UnlockResponse { + lock: deleted_lock, + message: "".to_string(), + }; + let unlock_response = serde_json::to_string(&unlock_response).unwrap(); + + let body = Body::from(unlock_response); + Ok(resp.body(body).unwrap()) +} + +async fn represent( + rv: &RequestVars, + meta: &MetaObject, + download: bool, + upload: bool, + use_tus: bool, + config: Arc>, +) -> Representation { + let mut rep = Representation { + oid: meta.oid.to_owned(), + size: meta.size, + actions: HashMap::new(), + error: ObjectError { + code: 0, + message: "".to_owned(), + }, + }; + + let mut header: HashMap = HashMap::new(); + let mut verify_header: HashMap = HashMap::new(); + + header.insert("Accept".to_string(), "application/vnd.git-lfs".to_owned()); + + if rv.authorization.len() > 0 { + header.insert("Authorization".to_string(), rv.authorization.to_owned()); + verify_header.insert("Authorization".to_string(), rv.authorization.to_owned()); + } + + if download { + rep.actions.insert( + "download".to_string(), + Link { + href: { + let config = Arc::clone(&config); + rv.download_link(config).await + }, + header: header.clone(), + expires_at: 0 as f64, + }, + ); + } + + if upload { + rep.actions.insert( + "upload".to_string(), + Link { + href: { + let config = Arc::clone(&config); + rv.upload_link(config).await + }, + header: header.clone(), + expires_at: 0 as f64, + }, + ); + if use_tus { + rep.actions.insert( + "verify".to_string(), + Link { + href: { + let config = Arc::clone(&config); + rv.verify_link(config).await + }, + header: verify_header.clone(), + expires_at: 0 as f64, + }, + ); + } + } + + rep +} + +#[cfg(test)] +mod tests { + use std::fs::write; + + use super::*; + + #[tokio::test] + async fn test_internal_link() { + let request_var = RequestVars { + oid: "oid_for_test".to_string(), + size: 0, + user: "test_user".to_string(), + password: "test_password".to_string(), + repo: "test_repo".to_string(), + authorization: "test_auth".to_string(), + }; + + // This configuration is ought to be shared across threads asyncrouslly. + let config = Arc::new(Mutex::new(Configuration { + listen: "test_listen".to_string(), + host: "test_host".to_string(), + ext_origin: "test_ext_origin".to_string(), + meta_db: "test_meta_db".to_string(), + content_path: "test_content_path".to_string(), + admin_user: "test_admin_user".to_string(), + admin_pass: "test_admin_pass".to_string(), + cert: "test_cert".to_string(), + key: "test_key".to_string(), + scheme: "test_scheme".to_string(), + public: "test_public".to_string(), + use_tus: "test_use_tus".to_string(), + tus_host: "test_tus_host".to_string(), + })); + + let res = request_var + .internal_link("test_subpath".to_string(), Arc::clone(&config)) + .await; + println!("{:?}", res); + + let res = request_var.upload_link(Arc::clone(&config)).await; + println!("{:?}", res); + + let res = request_var.verify_link(Arc::clone(&config)).await; + println!("{:?}", res); + } + + #[tokio::test] + async fn test_json() { + let request_var = RequestVars { + oid: "oid_for_test".to_string(), + size: 0, + user: "test_user".to_string(), + password: "test_password".to_string(), + repo: "test_repo".to_string(), + authorization: "test_auth".to_string(), + }; + + let json = serde_json::to_string(&request_var).unwrap(); + write("jsonfile.txt", json).unwrap(); + } +}