# iTool-Cloud-Components **Repository Path**: iToolRepos/iTool-Cloud-Components ## Basic Information - **Project Name**: iTool-Cloud-Components - **Description**: 为NetCore提供 In Process 可靠的/高速的 缓存、队列等常用分布式组件。 无第三方依赖的开发友好、运维友好型框架 - **Primary Language**: C# - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 18 - **Forks**: 0 - **Created**: 2021-11-16 - **Last Updated**: 2023-10-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: 消息队列, Redis, Kafka, NetCore, 缓存 ## README # iTool.ClusterComponent #### 介绍 在项目的高速发展中我们往往会为项目引入一堆第三方的中间件来确保项目的稳健发展(如:Redis/Kafka/Zookeeper等),从而导致项目开发运维成本都直线提升。 Net的社区比尴尬选择面比较少。该组件立志于解决在Net环境中,常用的分布式工具对于第三方的依赖,以解决项目高速发展所遭遇的扩展问题。 In process 可以非常有效的降低我们的开发运维成本,优化开发体验。 基于Orleans二次开发。遵循Actor设计思想,再对大量项目瓶颈难点进行分析总结后推出的一种套解决方案。 以解决项目高速发展所遭遇的扩展问题。无第三方依赖降低运维成本,优化开发体验。 最新更新一解除对SqlServer的依赖。 #### 功能 1. 可靠的高速缓存 2. 持久可重放的高速队列 - 广播 - 多消费者负载均衡 3. 分布式单线程任务模型 4. 分布式基于状态的事务模型 5. 分布式事件溯源 6. 跨实例状态广播同步以解决数据热点导致的性能瓶颈 7. 节点间高速RPC调用 8. 基于SQLite(集成Lucene)实现的高可用的索引数据库 9. 文件存储 - 分布式文件存储 - 大文件断点传输 - 自定义图片尺寸生成预览图和热内存 #### 安装教程 直接在项目中引入 Components 项目,并还原依赖。 或者直接在包管理器中搜索:`iTool.ClusterComponent` 并安装 #### 依赖 1. SQL Server 数据库 - 配置连接后,项目启动会自查环境。 账户需要创建 库/表权限。 #### 使用说明 1. Service.ConsoleApp 对于基础的服务配置进行演示 2. Cache.ConsoleApp 对于缓存使用进行演示 3. PushStream.ConsoleApp 对于队列发布者进行演示 4. SubStream.ConsoleApp 对于队列订阅进行演示 5. MultipleService.ConsoleApp 解决数据热点演示颗粒多激活状态同步 6. DashboardClient 对于分布式可视化Monitor面板进行演示 7. Limiting.ConsoleApp 对于 分布式锁实现 | 分布式限流阀 | 分布式并发冥等性 进行演示 8. CloudClient.ConsoleApp 对于C#客户端使用演示 9. UI 对于Blozor客户端使用演示 11. FileCenter 对于文件服务使用演示 ``` C# var builder = new iToolHostBuilder(); builder.UseAdoNetClustering(new AdoNetClusterOptions { AdoNetOptions = new AdoNetOptions { DataSource = "127.0.0.1,2433", UID = "sa", PWD = "zhuJIAN320" }, EndpointsOptions = new EndpointsOptions { //AdvertisedIP = null, // 外网IP,默认为空 //Port = inputarr[0], // 指定集群端口号,默认为: 11111 //GatewayPort = inputarr[1] // 指定客户端口号,默认为: 33333 }, ClusterOptions = new ClusterIdentificationOptions(), ResponseTimeout = TimeSpan.FromSeconds(15) // Call 超时时间 }); builder.UseStreamProvider("TestStream", 20); var iToolHost = await builder.BuildAndStartAsync(); ``` ##### 发布/订阅 ``` C# // 声名订阅逻辑 public class TestSubscribeStreamHandler : SubscribeQueueHandler { string topic; public TestSubscribeStreamHandler(string topic, string streamNamespace) : base(topic, streamNamespace) { this.topic = topic; } public override Task OnErrorAsync(Exception ex) { Console.WriteLine("OnErrorAsync:" + ex.Message); return Task.CompletedTask; } public async override Task OnMessageAsync(string message, StreamSequenceToken token) { try { if (token == null) { Console.WriteLine($"topic:{this.topic},message:{message}"); } else { var key = $"{token.SequenceNumber}_{token.EventIndex}"; Console.WriteLine($"topic:{this.topic},message:{message},SequenceNumber:{token.SequenceNumber},EventIndex:{token.EventIndex}"); } } catch (Exception ex) { Console.WriteLine(ex.Message); } await Task.CompletedTask; } } // 订阅消息 var subscribeStreamHandler = new TestSubscribeStreamHandler("topic", "groupName"); await subscribeStreamHandler.StartAsync(); // 发布消息 var handler1 = new ProducerQueueHandler("topic", "groupName"); handler1.SendMessageAsync(input + $",{DateTime.Now}"); ``` ##### Cache ``` C# // KeyValue { var storageService = clusterHostClient.GetService("cacheName"); // 获取缓存 string statevalue = await storageService.GetState(); // 修改缓存 await storageService.Modify("akjsdhkasdhkjasdhkajsdhk"); // 删除缓存 await storageService.Remove(); } // Hash { var service = clusterHostClient.GetService("hashTableName"); // set await service.SetAsync("fieldv", input); // get field var fieldv = await service.GetAsync("fieldv"); // remove field await service.RemoveAsync("fieldv"); // remove hash table await service.RemoveAsync(); } // set { var service1 = clusterHostClient.GetService("setList1"); var service2 = clusterHostClient.GetService("setList2"); var service3 = clusterHostClient.GetService("setList3"); // 差集 var list = await service3.GetDifferencesAsync(new string[] { "setList2" }); // 交集 list = await service3.GetIntersectAsync(new string[] { "setList2" }); // 并集 list = await service3.GetUnionAsync(new string[] { "setList1", "setList2" }); // get set list list = await service2.GetAsync(); // 是否存在 var isExistValue = await service3.ExistsAsync("7"); // remove item await service2.RemoveAsync("4"); // remove all await service3.RemoveAsync(); } // zset { var service = clusterHostClient.GetService("zsetList"); // set await service.SetAsync("asdfhagsd15", 15); await service.SetAsync("asdfhagsd16", 16); await service.SetAsync("asdfhagsd5", 5); await service.SetAsync("asdfhagsd4", 4); await service.SetAsync("asdfhagsd8", 8); await service.SetAsync("asdfhagsd17", 17); await service.SetAsync("asdfhagsd160", 160); await service.SetAsync("asdfhagsd222", 222); await service.SetAsync("asdfhagsd999", 999); // 获取有序列表 slist = await service.GetAsync(); // 获取第10位元素 value = await service.GetByIndexAsync(10); // 获取指定 score 元素 value = await service.GetByScoreAsync(222); // 获取起始区间元素 list = await service.GetRangeAsync(10, 333); // 对应 remove 操作 } ``` ##### 分布式锁 ``` C# { var scopeProvider = new OrderlyWorkScopeProvider("lock_Name"); Parallel.For(1, 2000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index => { using (await scopeProvider.CreateWorkUnitScopeAsync()) { // logic Console.WriteLine("get lock success:{0},{1}", index, DateTime.Now); await Task.Delay(1000); } }); } ``` ##### 分布式限流阀 ``` C# { int limit = 3; var scopeProvider = new LimitWorkScopeProvider(limit, "test_limit_name"); Parallel.For(1, 10000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index => { using (await scopeProvider.CreateWorkUnitScopeAsync()) { // 以获取执行权限, ps:这里将支持三个并发 Console.WriteLine("get excuter success:{0},{1}", index, DateTime.Now); await Task.Delay(1000); } }); } ``` ##### 分布式冥等 ``` C# { int cacheResultSize = 100; string actionGroup = "test_request_action"; IRequestIdempotenceService requestIdempotenceService = cluster.GetService(cacheResultSize, actionGroup); Parallel.For(1, 2000, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async index => { if (await requestIdempotenceService.StartIfNotExistAsync("test_request_token" + (index % 10), 10000)) { // 获取执行权限 Console.WriteLine("get excuter success:{0},{1}", index, DateTime.Now); await Task.Delay(1000); await requestIdempotenceService.SetResultAsync("test_request_token" + (index % 10), DateTime.Now); } else { // token 已处理,直接返回结果 object value = await requestIdempotenceService.GetResultAsync("test_request_token" + (index % 10)); Console.WriteLine("get result value:" + (index % 10) + value.ToString()); } }); } ``` ##### 数据库 支持分布式事务,并发读写动态负载平衡。基于Lucene和自定义函数增强功能同时增加操作效率。 无需提前建表建库,提更运行时检查。降低数据库使用复杂度,和开发效率。 ``` C# { var executor = cluster.GetSqlExecutor(); await executor.ExecuteNonQueryNoResultAsync("delete locations"); await executor.ExecuteNonQueryAsync(CityGeoInfo.GetCityGeoInfos().First().ToString()); await Parallel.ForEachAsync(CityGeoInfo.GetCityGeoInfos(), async (item, canceltoken) => { await executor.ExecuteNonQueryNoResultAsync(item.ToString()); }); // select distance(120.53,36.86,x,y) distance,CITY from locations where search(shoube(CITY),distmap(120.53,36.86,x,y),5) in ('三%',200) // select distance(120.53,36.86,x,y) distance,CITY from locations where search(must(CITY),distmap(120.53,36.86,x,y)) in ('三*',1000) order by distance(120.53,36.86,x,y) desc // search 代表走lucene // search(name,tag) like '%打游戏%' // search(name,tag, size: 20) like '%打游戏%' // search(name,tag, page:1, size:20) like '%打游戏%' 分页查询 // search(name,tag, token:'previous page token', size: 20) like '%打游戏%' 深度查询 // 复合条件: // search(shoube(CITY),distmap(120.53,36.86,x,y),5) in ('三%',200) 复合条件查询 // - shoube(...fields) 可以 // - must(...fields) 必须 // - mustnot(...fields) 必须不 // - distmap(x1,y1,x2,y2) 坐标距离索引(must) 单位km // - distmapnot(x1,y1,x2,y2) 坐标距离索引(must not) 单位km // order by sortdist(x,y) asc | desc 使用lucene 排序 // distance (x1,y1,x2,y2) 计算两点距离 单位km // 接口方法 interface iSqlProvider iSqlProvider = cluster.GetSqlExecutor(); Task<(List data, int total, string token)> ExecuteReaderAsync(string sql, params SqliteParameter[] parameters) where T : class, new(); ValueTask<(string data, int total, string token)> ExecuteReaderAsync(string sql, params SqliteParameter[] parameters); ValueTask ExecuteScalarAsync(string sql, params SqliteParameter[] parameters); ValueTask ExecuteNonQueryAsync(string sql, params SqliteParameter[] parameters); /// /// 不保证并发执行顺序 /// ValueTask ExecuteNonQueryNoResultAsync(string sql, params SqliteParameter[] parameters); ValueTask ExecuteTransactionAsync(List executeItems); ValueTask ExecuteTransactionOfLockTableAsync(List executeItems); ValueTask BatchExecuteNonQueryAsync(List executeItems); } ``` ##### 文件 文件复用功能基于文件摘要(md5) ``` C# // 文件删除 [HttpDelete("{id}")] public async Task DeleteFileAsync(string id) { iFileService iFileService = _storageService.GetService(id); await iFileService.DeleteFileAsync(); return base.Ok($"Deleted {id} successfully"); } // 文件预览 [HttpGet("{id}/{width}/{height}/view")] public async Task DownloadView(string id, int width, int height) { _logger.LogError(10, "就是报个错"); iFileService iFileService = _storageService.GetService(id); var info = await iFileService.GetFileInfoAsync(); if (info.UploadState < 200) { return default; } var fileBytes = await iFileService.GetStreamAsync(width, height); this.Response.ContentLength = fileBytes.Length; this.Response.Headers.Add("Accept-Ranges", "bytes"); this.Response.Headers.Add("Content-Range", "bytes 0-" + fileBytes.Length); return new FileStreamResult(new MemoryStream(fileBytes), info.ContentType); } // 文件下载 [HttpGet("{id}")] public async Task DownLoadFile(string id) { iFileService iFileService = _storageService.GetService(id); var info = await iFileService.GetFileInfoAsync(); MemoryStream fileStream = new MemoryStream(info.TotalLength); if (info.UploadState == 200) { var file = await iFileService.GetStreamAsync(); await fileStream.WriteAsync(file.FileStream, 0, file.FileStream.Length); } else if (info.UploadState == 201) { int page = 0; while (true) { page++; var file = await iFileService.GetStreamAsync(page); await fileStream.WriteAsync(file.FileStream, 0, file.FileStream.Length); if (file.IsEndNUmber) { break; } } } else { return Results.Ok(); } this.Response.ContentLength = info.TotalLength; this.Response.Headers.Add("Accept-Ranges", "bytes"); this.Response.Headers.Add("Content-Range", "bytes 0-" + info.TotalLength); fileStream.Position = 0; return File(fileStream, info.ContentType, string.Format("{0}{1}", id, info.SuffixName)); } // 文件管理脚本 [HttpGet("{queryScript}/query")] public async Task GetAllFileDetails(string queryScript) { iFileService iFileService = _storageService.GetService("0"); var files = await iFileService.QueryFileInfoAsync(queryScript); return Ok(files); } // 文件详情查询 [HttpGet("details/{id}")] public async Task GetFileDetails(string id) { iFileService iFileService = _storageService.GetService(id); var info = await iFileService.GetFileInfoAsync(); return Ok(info); } // 文件上传 [HttpPost] [DisableRequestSizeLimit] public async Task> UploadFile([FromForm] List Files) { List files = new List(); await Parallel.ForEachAsync(Files, async (file, token) => { await using (var stream = file.OpenReadStream()) { // Step 1 获取文件Key var retVal = MD5.Create().ComputeHash(stream); StringBuilder stringBuilder = new StringBuilder(); foreach (var item in retVal) { stringBuilder.Append(item.ToString("x2")); } string fileKey = stringBuilder.ToString(); if (files.Contains(fileKey)) { files.Add(fileKey); return; } files.Add(fileKey); // Step 2 获取Service iFileService iFileService = _storageService.GetService(fileKey); if (await iFileService.IsExistsAsync()) { // 文件已经存在 return; //return Results.Ok(fileKey); } // Step 3 定义缓冲区 int bufCount = 1024 * 128; // kb byte[] bufs = new byte[stream.Length > bufCount ? bufCount : stream.Length]; { // 如果文件小于缓冲区大小,则直接提交 if (stream.Length <= bufCount) { stream.Position = 0; await stream.ReadAsync(bufs, 0, (int)stream.Length); } await iFileService.UploadAsync(new UploadInfo { CreateDate = DateTime.Now, FileStream = stream.Length > bufCount ? new byte[0] : bufs, Role = "admin", User = "zxf", SuffixName = Path.GetExtension(file.FileName), TotalLength = stream.Length > bufCount ? 0 : bufs.Length, ContentType= file.ContentType }); } // 大文件分片上传 if (stream.Length > bufCount) { int index = 0, streamLength = (int)stream.Length, maxPage = (int)Math.Ceiling((decimal)streamLength / bufCount); stream.Position = 0; // 分片 while (true) { int sequence = await stream.ReadAsync(bufs, 0, bufCount); if (sequence == 0) { break; } index++; await iFileService.UploadPieceAsync(new UploadPiece { Number = index, FileStream = bufs.Take(sequence).ToArray(), IsEndNUmber = index == maxPage }); } await iFileService.UploadComplatedAsync(); } } }); return files; } ``` 赶快下载项目体验吧。 如果该项目能帮助到你,就给个 star 吧。 Thanks, Jian