From 04cc26ea7c33cb5c18ae72337da827e0a3cc0c86 Mon Sep 17 00:00:00 2001 From: yunsuoyan Date: Thu, 4 Dec 2025 16:24:47 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=B5=81=E5=BC=80?= =?UTF-8?q?=E5=8F=91=E6=8C=87=E5=8D=97=EF=BC=8C=E8=A1=A5=E5=85=85=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../development_guide/data_stream/index.md | 391 ++++++++++++------ 1 file changed, 269 insertions(+), 122 deletions(-) diff --git a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md index 4905ed07..ecd99b36 100644 --- a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md +++ b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md @@ -1,6 +1,6 @@ # 数据流 -openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通过数据流,应用可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,通过解耦数据生产方和消费方的同步连接关系,支持数据生产方和消费方按需异步调度。 +openYuanrong 提供了基于发布-订阅(pub/sub)模型的数据流,可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,数据流解耦了数据生产者和消费者,支持它们各自按需异步调度。 数据流中有四个关键概念:生产者(producer)、消费者(consumer)、流(stream)、数据项(element)。 @@ -9,17 +9,15 @@ openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通 - 数据项:数据以数据项的粒度在生产者和消费者间发送和接收。 - 流:生产者和消费者间不相互感知,通过流进行关联。生产者向流中发送数据,消费者订阅流并从流中接收数据。一个应用中可以有多条流,通过流名称区分。 -数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是 one-to-one(一个生产者一个消费者)、many-to-one(多个生产者一个消费者)及 one-to-many(一个生产者多个消费者)。 - -数据流接口支持 C++/Java/Python 多语言,本节以 C++ 语言为例介绍流、生产者、消费者、数据项四类接口的使用。 +数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是一个生产者一个消费者(one-to-one)、多个生产者一个消费者(many-to-one)及一个生产者多个消费者(one-to-many)。 ## 使用限制 -- 数据流定位在支持计算过程的中间数据流转,当前无持久化能力。若发送节点故障,则数据会丢失,此时需应用层进行故障处理,比如重启业务等。 -- 当前接口都是同步阻塞,没有类似 epoll 的多路复用能力。 -- 由于通过流解耦了生产者和消费者,生产者和消费者间不感知对方。也就是当生产者关闭时,消费者是无法感知的,需要业务层进行处理。 +- 数据流无持久化能力,发送节点故障时,数据会丢失,应用需要考虑对应的故障处,比如重启业务等。 +- 数据流都是同步接口,没有类似 epoll 的多路复用能力。 +- 流解耦了生产者和消费者,彼此间不感知对方。当生产者关闭时,消费者无法感知,需要业务层进行处理。 -## 流接口 +## 创建数据流 流代表了生产者和消费者间的发布订阅交互关系。流随着生产者或消费者的创建而隐式创建,无需应用显式创建流。 @@ -27,162 +25,311 @@ openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通 一条流上的生产者和消费者都关闭后,如果未操作接口删除流,流仍然存在,应用可以在这条流上继续关联新的生产者和消费者。您可以在创建生产者时指定流为自动删除(配置 `autoCleanup` 选项),当该流上所有生产者和消费者都关闭后,流将被系统自动删除。 -和流相关的操作有三个,创建生产者(可能隐式创建流)、创建消费者(可能隐式创建流)及删除流。 - -- 通过创建生产者产生流:`streamName` 为生产者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。可通过 `ProducerConf` 配置生产者属性或采用默认配置。 - - ```cpp - // 接口原型 - std::shared_ptr CreateProducer(const std::string &streamName, ProducerConf producerConf = {}); - ``` - - ```cpp - // 调用示例 - try { - YR::ProducerConf producerConf{.delayFlushTime = 5, .pageSize = 1024 * 1024ul, .maxStreamSize = 1024 * 1024 * 1024ul}; - std::shared_ptr producer = YR::CreateProducer("streamName", producerConf); - } catch (YR::Exception &e) { - // ....... - } - ``` - -- 通过创建消费者产生流:`streamName` 为消费者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。`SubscriptionConfig` 用于配置消费者行为,`autoAck` 配置开启[自动 ACK](development-data-stream-ack)。 - - ```cpp - // 接口原型 - std::shared_ptr Subscribe(const std::string &streamName, const SubscriptionConfig &config, bool autoAck = false); - ``` - - ```cpp - // 调用示例 +:::::{tab-set} +::::{tab-item} Python + +```python +import yr + +stream_name = "this-stream" +try: + # 配置流自动删除 + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + # 创建生产者将隐式创建流 this-stream + producer = yr.create_stream_producer(stream_name, producer_config) + # 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer.close() + + config = yr.SubscriptionConfig("local-consumer") + # 创建消费者将再次隐私创建流 this-stream + consumer = yr.create_stream_consumer(stream_name, config) + consumer.close() + + # 消费者新创建的流需要显示删除 + yr.delete_stream(stream_name) +except RuntimeError as exp: + # 处理异常 +``` + +:::: +::::{tab-item} C++ + +```cpp +#include "yr/yr.h" + +int main(int argc, char *argv[]) +{ + std::string streamName = "this-stream"; try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); + // 配置流自动删除 + YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + // 创建生产者将隐式创建流 this-stream + std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); + // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer->Close(); + + YR::SubscriptionConfig config("local-consumer", YR::SubscriptionType::STREAM); + // 创建消费者将再次隐私创建流 this-stream std::shared_ptr consumer = YR::Subscribe("streamName", config, true); + consumer->Close(); + + // 消费者新创建的流需要显示删除 + YR::DeleteStream(streamName); } catch (YR::Exception &e) { - // ....... + // 处理异常 + } +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) { + String streamName = "this-stream"; + try { + // 配置流自动删除 + ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + // 创建生产者将隐式创建流 this-stream + Producer producer = YR.createProducer(streamName, pCfg); + // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer.close(); + + SubscriptionConfig config = SubscriptionConfig.builder().subscriptionName("local_consumer").build(); + // 创建消费者将再次隐私创建流 this-stream + Consumer consumer = YR.subscribe(streamName, config); + + // 消费者新创建的流需要显示删除 + YR.deleteStream(streamName); + } catch (YRException e) { + // handle exception + } } - ``` +} +``` -- 删除流:在全局生产者和消费者个数为 0 时,该数据流不再使用,应用可使用流名称 `streamName` 主动删除流。 +:::: +::::: - ```cpp - // 接口原型 - void DeleteStream(const std::string &streamName); - ``` +## 生产数据 - ```cpp - // 调用示例 - try { - YR::DeleteStream("streamName"); - } catch (YR::Exception &e) { - // ....... - } - ``` +生产者(Producer)可向流中发送数据。生产者发送的数据会先放入缓冲区,系统根据生产者配置的 Flush 策略(发送间隔一段时间或者缓冲写满)刷新缓冲使其对消费者可见。生产者不再使用时,需要主动关闭。 + +:::::{tab-set} +::::{tab-item} Python -## 生产者接口 +```python +import yr -生产者(Producer)可向流中发送数据。 +stream_name = "this-stream" +try: + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + producer = yr.create_stream_producer(stream_name, producer_config) + # 生产一条数据 + element = yr.Element(data=b"hello", id=0) + producer.send(element) -- Send 方法:生产者发送数据,数据会先放入缓冲区,系统根据生产者配置的 Flush 策略(发送间隔一段时间或者缓冲写满)刷新缓冲使其对消费者可见。 + # 关闭生产者 + producer.close() +except RuntimeError as exp: + # 处理异常 +``` -- Close 方法:关闭生产者,会触发自动 Flush 数据缓冲。一旦关闭,生产者不再可用。 +:::: +::::{tab-item} C++ - ```cpp - // 接口原型 - void Close(); - ``` +```cpp +#include "yr/yr.h" - ```cpp - // 调用示例 +int main(int argc, char *argv[]) +{ + std::string streamName = "this-stream"; try { - std::shared_ptr producer = YR::CreateProducer("streamName"); - // ....... + YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); + + // 生产一条数据 + std::string str = "hello"; + YR::Element element((uint8_t *)(str.c_str()), str.size()); + producer->send(element); + producer->Close(); } catch (YR::Exception &e) { - // ....... + // 处理异常 + } +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) { + String streamName = "this-stream"; + try { + ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pCfg); + + // 生产一条数据 + String data = "hello"; + ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); + Element element = new Element(0L, buffer); + producer.send(element); + + // 关闭生产者 + producer.close(); + } catch (YRException e) { + // handle exception + } } - ``` +} +``` -## 消费者接口 +:::: +::::: -消费者(Consumer)可接收流中的数据,使用 `Ack` 方法确认数据接收,以及主动关闭。 +## 消费数据 -- Receive 方法:接收数据会等待 `expectNum` 个 `elements`,当到达超时时间 `timeoutMs` 或满足期望个数的数据时,该调用返回。 +消费者(Consumer)可接收流中的数据,使用 `Ack` 方法确认数据接收。消费者不再使用时,需要主动关闭。 - ```cpp - // 接口原型 - void Receive(uint32_t timeoutMs, std::vector &outElements); - void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector &outElements); +消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。数据流提供了自动 ACK 功能,只需在创建消费者时配置 `autoAck` 为 `true`。应用每次调用 `Receive` 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 `Ack` 方法。 - ``` +:::{note} - ```cpp - // 调用示例 - try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ...... - std::vector elements; - consumer->Receive(1, 6000, elements); - } catch (YR::Exception &e) { - // ....... - } - ``` +开启自动 ACK 后,用户需保证消费者每次调用 `Receive` 前,上一次接收到的数据已消费完。调用 `Receive` 后,继续消费上一次接收到的数据系统未定义。 + +::: + +:::{hint} -(development-data-stream-ack)= +当消费者调用 `Receive` 方法时,会获取 `Element` 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。 -- Ack 方法:消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。 +::: - 数据流提供了自动 ACK 功能,只需在创建消费者时配置 `autoAck` 为 `true`。应用每次调用 `Receive` 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 `Ack` 方法。 +:::::{tab-set} +::::{tab-item} Python - :::{note} +```python +import yr - 开启自动 ACK 后,用户需保证消费者每次调用 `Receive` 前,上一次接收到的数据已消费完。调用 `Receive` 后,继续消费上一次接收到的数据系统未定义。 +stream_name = "this-stream" +try: + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + producer = yr.create_stream_producer(stream_name, producer_config) - ::: + config = yr.SubscriptionConfig("local_consumer") + consumer = yr.create_stream_consumer(stream_name, config) - :::{hint} + element_in = yr.Element(data=b"hello", id=0) + producer.send(data_in) + # 等待到一条数据或者1秒超时 + element_out = consumer.receive(1000, 1) + print("receive:" + element_out.data.decode()) - 当消费者调用 `Receive` 方法时,会获取 `Element` 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。 + producer.close() + consumer.close() +except RuntimeError as exp: + # 处理异常 +``` - ::: +:::: +::::{tab-item} C++ - ```cpp - // 接口原型 - void Ack(uint64_t elementId); - ``` +```cpp +#include "yr/yr.h" - ```cpp - // 调用示例 +int main(int argc, char *argv[]) +{ + std::string streamName = "this-stream"; try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ...... - consumer->Ack("elementID"); - } catch (YR::Exception &e) { - // ....... - } + YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); - ``` + YR::SubscriptionConfig config("local-consumer", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe("streamName", config, true); -- Close 方法:关闭消费者。一旦关闭,消费者不再可用。 + // 生产一条数据 + std::string str = "hello"; + YR::Element element((uint8_t *)(str.c_str()), str.size()); + producer->send(element); - ```cpp - // 接口原型 - void Close(); - ``` + std::vector elements; + consumer->Receive(1, 6000, elements); - ```cpp - // 调用示例 - try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ....... + producer->Close(); consumer->Close(); } catch (YR::Exception &e) { - // ....... + // 处理异常 } - ``` +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) { + String streamName = "this-stream"; + try { + ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pCfg); + + // 生产一条数据 + String data = "hello"; + ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); + Element element = new Element(0L, buffer); + producer.send(element); + + List recv = consumer.receive(3, 6000); + if (recv.isEmpty()) { + // handle empty. + } + Element e = recv.get(0); + Charset charset = Charset.forName("UTF-8"); + String res = charset.decode(e.getBuffer()).toString(); + consumer.ack(e.getId()); + + producer.close(); + consumer.close(); + } catch (YRException e) { + // handle exception + } + } +} +``` + +:::: +::::: ## 数据项 -- Gitee From 7864cd2198ea9631fa1d41d7708520bf4fa39a81 Mon Sep 17 00:00:00 2001 From: yunsuoyan Date: Thu, 4 Dec 2025 20:17:37 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=83=A8=E5=88=86?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../development_guide/data_stream/index.md | 214 +++++++++++------- 1 file changed, 137 insertions(+), 77 deletions(-) diff --git a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md index ecd99b36..aa03d065 100644 --- a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md +++ b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md @@ -17,7 +17,7 @@ openYuanrong 提供了基于发布-订阅(pub/sub)模型的数据流,可 - 数据流都是同步接口,没有类似 epoll 的多路复用能力。 - 流解耦了生产者和消费者,彼此间不感知对方。当生产者关闭时,消费者无法感知,需要业务层进行处理。 -## 创建数据流 +## 创建流 流代表了生产者和消费者间的发布订阅交互关系。流随着生产者或消费者的创建而隐式创建,无需应用显式创建流。 @@ -31,6 +31,7 @@ openYuanrong 提供了基于发布-订阅(pub/sub)模型的数据流,可 ```python import yr +yr.init() stream_name = "this-stream" try: # 配置流自动删除 @@ -40,44 +41,51 @@ try: # 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 producer.close() - config = yr.SubscriptionConfig("local-consumer") - # 创建消费者将再次隐私创建流 this-stream - consumer = yr.create_stream_consumer(stream_name, config) + consumer_config = yr.SubscriptionConfig("local-consumer") + # 创建消费者将再次隐式创建流 this-stream + consumer = yr.create_stream_consumer(stream_name, consumer_config) consumer.close() # 消费者新创建的流需要显示删除 yr.delete_stream(stream_name) except RuntimeError as exp: - # 处理异常 + print(exp) + +yr.finalize() ``` :::: ::::{tab-item} C++ ```cpp +#include #include "yr/yr.h" int main(int argc, char *argv[]) { + YR::Init(YR::Config{}, argc, argv) std::string streamName = "this-stream"; try { // 配置流自动删除 - YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; // 创建生产者将隐式创建流 this-stream - std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 producer->Close(); - - YR::SubscriptionConfig config("local-consumer", YR::SubscriptionType::STREAM); - // 创建消费者将再次隐私创建流 this-stream - std::shared_ptr consumer = YR::Subscribe("streamName", config, true); + + YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM); + // 创建消费者将再次隐式创建流 this-stream + std::shared_ptr consumer = YR::Subscribe(streamName, sConfig); consumer->Close(); // 消费者新创建的流需要显示删除 YR::DeleteStream(streamName); } catch (YR::Exception &e) { - // 处理异常 + std::cout << e.what() << std::endl; } + + YR::Finalize(); + return 0; } ``` @@ -87,6 +95,12 @@ int main(int argc, char *argv[]) ```java package com.example; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; @@ -95,25 +109,30 @@ import com.yuanrong.stream.SubscriptionType; import com.yuanrong.stream.Element; public class Main { - public static void main(String[] args) { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + String streamName = "this-stream"; try { // 配置流自动删除 - ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); // 创建生产者将隐式创建流 this-stream - Producer producer = YR.createProducer(streamName, pCfg); + Producer producer = YR.createProducer(streamName, pConfig); // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 producer.close(); - SubscriptionConfig config = SubscriptionConfig.builder().subscriptionName("local_consumer").build(); + SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build(); // 创建消费者将再次隐私创建流 this-stream - Consumer consumer = YR.subscribe(streamName, config); - + Consumer consumer = YR.subscribe(streamName, sConfig); + consumer.close(); + // 消费者新创建的流需要显示删除 YR.deleteStream(streamName); } catch (YRException e) { - // handle exception + e.printStackTrace(); } + + YR.Finalize(); } } ``` @@ -131,42 +150,52 @@ public class Main { ```python import yr +yr.init() stream_name = "this-stream" try: producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) producer = yr.create_stream_producer(stream_name, producer_config) - # 生产一条数据 - element = yr.Element(data=b"hello", id=0) + + # 生产数据 + element = yr.Element(value=b"hello", ele_id=0) producer.send(element) - # 关闭生产者 + # 主动关闭生产者 producer.close() except RuntimeError as exp: - # 处理异常 + print(exp) + +yr.finalize() ``` :::: ::::{tab-item} C++ ```cpp +#include #include "yr/yr.h" int main(int argc, char *argv[]) { + YR::Init(YR::Config{}, argc, argv) std::string streamName = "this-stream"; try { - YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; - std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); - - // 生产一条数据 - std::string str = "hello"; - YR::Element element((uint8_t *)(str.c_str()), str.size()); - producer->send(element); + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); + // 生产数据 + std::string data = "hello"; + YR::Element element((uint8_t *)(data.c_str()), data.size()); + producer->Send(element); + + // 主动关闭生产者 producer->Close(); } catch (YR::Exception &e) { - // 处理异常 + std::cout << e.what() << std::endl; } + + YR::Finalize(); + return 0; } ``` @@ -176,6 +205,12 @@ int main(int argc, char *argv[]) ```java package com.example; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; @@ -184,13 +219,15 @@ import com.yuanrong.stream.SubscriptionType; import com.yuanrong.stream.Element; public class Main { - public static void main(String[] args) { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + String streamName = "this-stream"; try { - ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); - Producer producer = YR.createProducer(streamName, pCfg); + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pConfig); - // 生产一条数据 + // 生产数据 String data = "hello"; ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); Element element = new Element(0L, buffer); @@ -199,8 +236,10 @@ public class Main { // 关闭生产者 producer.close(); } catch (YRException e) { - // handle exception + e.printStackTrace(); } + + YR.Finalize(); } } ``` @@ -232,55 +271,73 @@ public class Main { ```python import yr +yr.init() stream_name = "this-stream" try: producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) producer = yr.create_stream_producer(stream_name, producer_config) - config = yr.SubscriptionConfig("local_consumer") - consumer = yr.create_stream_consumer(stream_name, config) + consumer_config = yr.SubscriptionConfig("local-consumer") + consumer = yr.create_stream_consumer(stream_name, consumer_config) - element_in = yr.Element(data=b"hello", id=0) - producer.send(data_in) - # 等待到一条数据或者1秒超时 - element_out = consumer.receive(1000, 1) - print("receive:" + element_out.data.decode()) + element = yr.Element(value=b"hello", ele_id=0) + producer.send(element) + + # 消费数据,等待到一条数据或者1秒超时 + elements = consumer.receive(1000, 1) + for e in elements: + print("receive:" + e.data.decode()) producer.close() + # 主动关闭消费者 consumer.close() except RuntimeError as exp: - # 处理异常 + print(exp) + +yr.finalize() ``` :::: ::::{tab-item} C++ ```cpp +#include #include "yr/yr.h" int main(int argc, char *argv[]) { + YR::Init(YR::Config{}, argc, argv) std::string streamName = "this-stream"; try { - YR::ProducerConf producerConf{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; - std::shared_ptr producer = YR::CreateProducer(streamName, producerConf); + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); - YR::SubscriptionConfig config("local-consumer", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config, true); + YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe(streamName, sConfig); - // 生产一条数据 - std::string str = "hello"; - YR::Element element((uint8_t *)(str.c_str()), str.size()); - producer->send(element); + std::string data = "hello"; + YR::Element element((uint8_t *)(data.c_str()), data.size()); + producer->Send(element); + // 消费数据,等待到一条数据或者1秒超时 std::vector elements; - consumer->Receive(1, 6000, elements); + consumer->Receive(1, 1000, elements); + for (auto e : elements) { + std::string str(reinterpret_cast(e.ptr), e.size); + // 手动ACK + consumer->Ack(e.id); + std::cout << "receive: " << str << std::endl; + } producer->Close(); + // 主动关闭消费者 consumer->Close(); } catch (YR::Exception &e) { - // 处理异常 + std::cout << e.what() << std::endl; } + + YR::Finalize(); + return 0; } ``` @@ -290,6 +347,12 @@ int main(int argc, char *argv[]) ```java package com.example; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; @@ -298,32 +361,40 @@ import com.yuanrong.stream.SubscriptionType; import com.yuanrong.stream.Element; public class Main { - public static void main(String[] args) { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + String streamName = "this-stream"; try { - ProducerConfig pCfg = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); - Producer producer = YR.createProducer(streamName, pCfg); + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pConfig); + + SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build(); + Consumer consumer = YR.subscribe(streamName, sConfig); - // 生产一条数据 String data = "hello"; ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); Element element = new Element(0L, buffer); producer.send(element); - List recv = consumer.receive(3, 6000); - if (recv.isEmpty()) { - // handle empty. - } - Element e = recv.get(0); + // 消费数据,等待到一条数据或者3秒超时 Charset charset = Charset.forName("UTF-8"); - String res = charset.decode(e.getBuffer()).toString(); - consumer.ack(e.getId()); + List elements = consumer.receive(1, 3000); + for (Element e : elements) { + String str = charset.decode(e.getBuffer()).toString(); + // 手动ACK + consumer.ack(e.getId()); + System.out.println("receive: " + str); + } producer.close(); + // 主动关闭消费者 consumer.close(); } catch (YRException e) { - // handle exception + e.printStackTrace(); } + + YR.Finalize(); } } ``` @@ -331,14 +402,3 @@ public class Main { :::: ::::: -## 数据项 - -数据流以数据项为单位发送数据,数据项结构体如下: - -| **字段** | **类型** | **说明** | -| -------- | --------- | -------------- | -| **ptr** | uint8_t * | 指向数据的指针。 | -| **size** | unit64_t | 数据长度。 | -| **id** | uint64_t | 数据项的 id。 | - -`ptr` 字段指向数据的存放地址。在生产者端, `ptr` 字段指向应用的内存空间,因为发送的数据是应用填充的。在消费者端, `ptr` 字段指向的地址空间位于应用函数和数据系统间的共享内存中,这也是为什么会需要 ACK 操作的原因。 -- Gitee From 8774efe74e05fe03e2d131889756f2baed95d71f Mon Sep 17 00:00:00 2001 From: yunsuoyan Date: Thu, 4 Dec 2025 20:31:46 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../development_guide/data_stream/index.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md index aa03d065..9209d775 100644 --- a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md +++ b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md @@ -63,7 +63,7 @@ yr.finalize() int main(int argc, char *argv[]) { - YR::Init(YR::Config{}, argc, argv) + YR::Init(YR::Config{}, argc, argv); std::string streamName = "this-stream"; try { // 配置流自动删除 @@ -101,6 +101,7 @@ import java.util.List; import com.yuanrong.api.YR; import com.yuanrong.Config; +import com.yuanrong.exception.YRException; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; @@ -177,7 +178,7 @@ yr.finalize() int main(int argc, char *argv[]) { - YR::Init(YR::Config{}, argc, argv) + YR::Init(YR::Config{}, argc, argv); std::string streamName = "this-stream"; try { YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; @@ -211,6 +212,7 @@ import java.util.List; import com.yuanrong.api.YR; import com.yuanrong.Config; +import com.yuanrong.exception.YRException; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; @@ -306,7 +308,7 @@ yr.finalize() int main(int argc, char *argv[]) { - YR::Init(YR::Config{}, argc, argv) + YR::Init(YR::Config{}, argc, argv); std::string streamName = "this-stream"; try { YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; @@ -353,6 +355,7 @@ import java.util.List; import com.yuanrong.api.YR; import com.yuanrong.Config; +import com.yuanrong.exception.YRException; import com.yuanrong.stream.Producer; import com.yuanrong.stream.ProducerConfig; import com.yuanrong.stream.Consumer; -- Gitee