diff --git a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md index 4905ed0767f9da023e7d0a45fbcfe1b955ddaf97..9209d77595faf9a7461f7420b81577b24bad7984 100644 --- a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md +++ b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md @@ -1,6 +1,6 @@ # 数据流 -openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通过数据流,应用可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,通过解耦数据生产方和消费方的同步连接关系,支持数据生产方和消费方按需异步调度。 +openYuanrong 提供了基于发布-订阅(pub/sub)模型的数据流,可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,数据流解耦了数据生产者和消费者,支持它们各自按需异步调度。 数据流中有四个关键概念:生产者(producer)、消费者(consumer)、流(stream)、数据项(element)。 @@ -9,17 +9,15 @@ openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通 - 数据项:数据以数据项的粒度在生产者和消费者间发送和接收。 - 流:生产者和消费者间不相互感知,通过流进行关联。生产者向流中发送数据,消费者订阅流并从流中接收数据。一个应用中可以有多条流,通过流名称区分。 -数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是 one-to-one(一个生产者一个消费者)、many-to-one(多个生产者一个消费者)及 one-to-many(一个生产者多个消费者)。 - -数据流接口支持 C++/Java/Python 多语言,本节以 C++ 语言为例介绍流、生产者、消费者、数据项四类接口的使用。 +数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是一个生产者一个消费者(one-to-one)、多个生产者一个消费者(many-to-one)及一个生产者多个消费者(one-to-many)。 ## 使用限制 -- 数据流定位在支持计算过程的中间数据流转,当前无持久化能力。若发送节点故障,则数据会丢失,此时需应用层进行故障处理,比如重启业务等。 -- 当前接口都是同步阻塞,没有类似 epoll 的多路复用能力。 -- 由于通过流解耦了生产者和消费者,生产者和消费者间不感知对方。也就是当生产者关闭时,消费者是无法感知的,需要业务层进行处理。 +- 数据流无持久化能力,发送节点故障时,数据会丢失,应用需要考虑对应的故障处,比如重启业务等。 +- 数据流都是同步接口,没有类似 epoll 的多路复用能力。 +- 流解耦了生产者和消费者,彼此间不感知对方。当生产者关闭时,消费者无法感知,需要业务层进行处理。 -## 流接口 +## 创建流 流代表了生产者和消费者间的发布订阅交互关系。流随着生产者或消费者的创建而隐式创建,无需应用显式创建流。 @@ -27,171 +25,383 @@ openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通 一条流上的生产者和消费者都关闭后,如果未操作接口删除流,流仍然存在,应用可以在这条流上继续关联新的生产者和消费者。您可以在创建生产者时指定流为自动删除(配置 `autoCleanup` 选项),当该流上所有生产者和消费者都关闭后,流将被系统自动删除。 -和流相关的操作有三个,创建生产者(可能隐式创建流)、创建消费者(可能隐式创建流)及删除流。 - -- 通过创建生产者产生流:`streamName` 为生产者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。可通过 `ProducerConf` 配置生产者属性或采用默认配置。 +:::::{tab-set} +::::{tab-item} Python + +```python +import yr + +yr.init() +stream_name = "this-stream" +try: + # 配置流自动删除 + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + # 创建生产者将隐式创建流 this-stream + producer = yr.create_stream_producer(stream_name, producer_config) + # 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer.close() + + consumer_config = yr.SubscriptionConfig("local-consumer") + # 创建消费者将再次隐式创建流 this-stream + consumer = yr.create_stream_consumer(stream_name, consumer_config) + consumer.close() + + # 消费者新创建的流需要显示删除 + yr.delete_stream(stream_name) +except RuntimeError as exp: + print(exp) + +yr.finalize() +``` + +:::: +::::{tab-item} C++ + +```cpp +#include +#include "yr/yr.h" + +int main(int argc, char *argv[]) +{ + YR::Init(YR::Config{}, argc, argv); + std::string streamName = "this-stream"; + try { + // 配置流自动删除 + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + // 创建生产者将隐式创建流 this-stream + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); + // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer->Close(); - ```cpp - // 接口原型 - std::shared_ptr CreateProducer(const std::string &streamName, ProducerConf producerConf = {}); - ``` + YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM); + // 创建消费者将再次隐式创建流 this-stream + std::shared_ptr consumer = YR::Subscribe(streamName, sConfig); + consumer->Close(); - ```cpp - // 调用示例 - try { - YR::ProducerConf producerConf{.delayFlushTime = 5, .pageSize = 1024 * 1024ul, .maxStreamSize = 1024 * 1024 * 1024ul}; - std::shared_ptr producer = YR::CreateProducer("streamName", producerConf); + // 消费者新创建的流需要显示删除 + YR::DeleteStream(streamName); } catch (YR::Exception &e) { - // ....... + std::cout << e.what() << std::endl; } - ``` -- 通过创建消费者产生流:`streamName` 为消费者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。`SubscriptionConfig` 用于配置消费者行为,`autoAck` 配置开启[自动 ACK](development-data-stream-ack)。 + YR::Finalize(); + return 0; +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; +import com.yuanrong.exception.YRException; +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + + String streamName = "this-stream"; + try { + // 配置流自动删除 + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + // 创建生产者将隐式创建流 this-stream + Producer producer = YR.createProducer(streamName, pConfig); + // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除 + producer.close(); + + SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build(); + // 创建消费者将再次隐私创建流 this-stream + Consumer consumer = YR.subscribe(streamName, sConfig); + consumer.close(); + + // 消费者新创建的流需要显示删除 + YR.deleteStream(streamName); + } catch (YRException e) { + e.printStackTrace(); + } + + YR.Finalize(); + } +} +``` - ```cpp - // 接口原型 - std::shared_ptr Subscribe(const std::string &streamName, const SubscriptionConfig &config, bool autoAck = false); - ``` +:::: +::::: - ```cpp - // 调用示例 - try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config, true); - } catch (YR::Exception &e) { - // ....... - } - ``` +## 生产数据 -- 删除流:在全局生产者和消费者个数为 0 时,该数据流不再使用,应用可使用流名称 `streamName` 主动删除流。 +生产者(Producer)可向流中发送数据。生产者发送的数据会先放入缓冲区,系统根据生产者配置的 Flush 策略(发送间隔一段时间或者缓冲写满)刷新缓冲使其对消费者可见。生产者不再使用时,需要主动关闭。 - ```cpp - // 接口原型 - void DeleteStream(const std::string &streamName); - ``` +:::::{tab-set} +::::{tab-item} Python - ```cpp - // 调用示例 - try { - YR::DeleteStream("streamName"); - } catch (YR::Exception &e) { - // ....... - } - ``` +```python +import yr -## 生产者接口 +yr.init() +stream_name = "this-stream" +try: + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + producer = yr.create_stream_producer(stream_name, producer_config) -生产者(Producer)可向流中发送数据。 + # 生产数据 + element = yr.Element(value=b"hello", ele_id=0) + producer.send(element) -- Send 方法:生产者发送数据,数据会先放入缓冲区,系统根据生产者配置的 Flush 策略(发送间隔一段时间或者缓冲写满)刷新缓冲使其对消费者可见。 + # 主动关闭生产者 + producer.close() +except RuntimeError as exp: + print(exp) -- Close 方法:关闭生产者,会触发自动 Flush 数据缓冲。一旦关闭,生产者不再可用。 +yr.finalize() +``` - ```cpp - // 接口原型 - void Close(); - ``` +:::: +::::{tab-item} C++ - ```cpp - // 调用示例 +```cpp +#include +#include "yr/yr.h" + +int main(int argc, char *argv[]) +{ + YR::Init(YR::Config{}, argc, argv); + std::string streamName = "this-stream"; try { - std::shared_ptr producer = YR::CreateProducer("streamName"); - // ....... + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); + + // 生产数据 + std::string data = "hello"; + YR::Element element((uint8_t *)(data.c_str()), data.size()); + producer->Send(element); + + // 主动关闭生产者 producer->Close(); } catch (YR::Exception &e) { - // ....... + std::cout << e.what() << std::endl; } - ``` -## 消费者接口 + YR::Finalize(); + return 0; +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; +import com.yuanrong.exception.YRException; +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + + String streamName = "this-stream"; + try { + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pConfig); + + // 生产数据 + String data = "hello"; + ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); + Element element = new Element(0L, buffer); + producer.send(element); + + // 关闭生产者 + producer.close(); + } catch (YRException e) { + e.printStackTrace(); + } + + YR.Finalize(); + } +} +``` -消费者(Consumer)可接收流中的数据,使用 `Ack` 方法确认数据接收,以及主动关闭。 +:::: +::::: -- Receive 方法:接收数据会等待 `expectNum` 个 `elements`,当到达超时时间 `timeoutMs` 或满足期望个数的数据时,该调用返回。 +## 消费数据 - ```cpp - // 接口原型 - void Receive(uint32_t timeoutMs, std::vector &outElements); - void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector &outElements); +消费者(Consumer)可接收流中的数据,使用 `Ack` 方法确认数据接收。消费者不再使用时,需要主动关闭。 - ``` +消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。数据流提供了自动 ACK 功能,只需在创建消费者时配置 `autoAck` 为 `true`。应用每次调用 `Receive` 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 `Ack` 方法。 - ```cpp - // 调用示例 - try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ...... - std::vector elements; - consumer->Receive(1, 6000, elements); - } catch (YR::Exception &e) { - // ....... - } - ``` +:::{note} -(development-data-stream-ack)= +开启自动 ACK 后,用户需保证消费者每次调用 `Receive` 前,上一次接收到的数据已消费完。调用 `Receive` 后,继续消费上一次接收到的数据系统未定义。 -- Ack 方法:消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。 +::: - 数据流提供了自动 ACK 功能,只需在创建消费者时配置 `autoAck` 为 `true`。应用每次调用 `Receive` 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 `Ack` 方法。 +:::{hint} - :::{note} +当消费者调用 `Receive` 方法时,会获取 `Element` 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。 - 开启自动 ACK 后,用户需保证消费者每次调用 `Receive` 前,上一次接收到的数据已消费完。调用 `Receive` 后,继续消费上一次接收到的数据系统未定义。 +::: - ::: +:::::{tab-set} +::::{tab-item} Python - :::{hint} +```python +import yr - 当消费者调用 `Receive` 方法时,会获取 `Element` 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。 +yr.init() +stream_name = "this-stream" +try: + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + producer = yr.create_stream_producer(stream_name, producer_config) - ::: + consumer_config = yr.SubscriptionConfig("local-consumer") + consumer = yr.create_stream_consumer(stream_name, consumer_config) - ```cpp - // 接口原型 - void Ack(uint64_t elementId); - ``` + element = yr.Element(value=b"hello", ele_id=0) + producer.send(element) - ```cpp - // 调用示例 - try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ...... - consumer->Ack("elementID"); - } catch (YR::Exception &e) { - // ....... - } + # 消费数据,等待到一条数据或者1秒超时 + elements = consumer.receive(1000, 1) + for e in elements: + print("receive:" + e.data.decode()) + + producer.close() + # 主动关闭消费者 + consumer.close() +except RuntimeError as exp: + print(exp) - ``` +yr.finalize() +``` -- Close 方法:关闭消费者。一旦关闭,消费者不再可用。 +:::: +::::{tab-item} C++ - ```cpp - // 接口原型 - void Close(); - ``` +```cpp +#include +#include "yr/yr.h" - ```cpp - // 调用示例 +int main(int argc, char *argv[]) +{ + YR::Init(YR::Config{}, argc, argv); + std::string streamName = "this-stream"; try { - YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); - std::shared_ptr consumer = YR::Subscribe("streamName", config); - // ....... + YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true}; + std::shared_ptr producer = YR::CreateProducer(streamName, pConfig); + + YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe(streamName, sConfig); + + std::string data = "hello"; + YR::Element element((uint8_t *)(data.c_str()), data.size()); + producer->Send(element); + + // 消费数据,等待到一条数据或者1秒超时 + std::vector elements; + consumer->Receive(1, 1000, elements); + for (auto e : elements) { + std::string str(reinterpret_cast(e.ptr), e.size); + // 手动ACK + consumer->Ack(e.id); + std::cout << "receive: " << str << std::endl; + } + + producer->Close(); + // 主动关闭消费者 consumer->Close(); } catch (YR::Exception &e) { - // ....... + std::cout << e.what() << std::endl; } - ``` -## 数据项 - -数据流以数据项为单位发送数据,数据项结构体如下: + YR::Finalize(); + return 0; +} +``` + +:::: +::::{tab-item} Java + +```java +package com.example; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import com.yuanrong.api.YR; +import com.yuanrong.Config; +import com.yuanrong.exception.YRException; +import com.yuanrong.stream.Producer; +import com.yuanrong.stream.ProducerConfig; +import com.yuanrong.stream.Consumer; +import com.yuanrong.stream.SubscriptionConfig; +import com.yuanrong.stream.SubscriptionType; +import com.yuanrong.stream.Element; + +public class Main { + public static void main(String[] args) throws YRException { + YR.init(new Config()); + + String streamName = "this-stream"; + try { + ProducerConfig pConfig = ProducerConfig.builder().delayFlushTimeMs(5L).pageSizeByte(1024 * 1024L).maxStreamSize(1024 * 1024 * 1024L).autoCleanup(true).build(); + Producer producer = YR.createProducer(streamName, pConfig); + + SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build(); + Consumer consumer = YR.subscribe(streamName, sConfig); + + String data = "hello"; + ByteBuffer buffer = ByteBuffer.wrap(data.getBytes()); + Element element = new Element(0L, buffer); + producer.send(element); + + // 消费数据,等待到一条数据或者3秒超时 + Charset charset = Charset.forName("UTF-8"); + List elements = consumer.receive(1, 3000); + for (Element e : elements) { + String str = charset.decode(e.getBuffer()).toString(); + // 手动ACK + consumer.ack(e.getId()); + System.out.println("receive: " + str); + } + + producer.close(); + // 主动关闭消费者 + consumer.close(); + } catch (YRException e) { + e.printStackTrace(); + } + + YR.Finalize(); + } +} +``` -| **字段** | **类型** | **说明** | -| -------- | --------- | -------------- | -| **ptr** | uint8_t * | 指向数据的指针。 | -| **size** | unit64_t | 数据长度。 | -| **id** | uint64_t | 数据项的 id。 | +:::: +::::: -`ptr` 字段指向数据的存放地址。在生产者端, `ptr` 字段指向应用的内存空间,因为发送的数据是应用填充的。在消费者端, `ptr` 字段指向的地址空间位于应用函数和数据系统间的共享内存中,这也是为什么会需要 ACK 操作的原因。