diff --git a/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md new file mode 100644 index 0000000000000000000000000000000000000000..bc2ff3fec55031c20177ba40023575c1ea54717f --- /dev/null +++ b/docs/multi_language_function_programming_interface/development_guide/data_stream/index.md @@ -0,0 +1,197 @@ +# 数据流 + +openYuanrong提供了基于 Publish-SubScribe(pub-sub) 模型的数据流。通过数据流,应用可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,通过解耦数据生产方和消费方的同步连接关系,支持数据生产方和消费方按需异步调度。 + +数据流中有四个关键概念:生产者(producer)、消费者(consumer)、流(stream)、数据项(element)。 + +- 生产者:生产者是无界数据流的发起端,产生并发送数据。 +- 消费者:消费者是无界数据流的接收端,消费数据。 +- 数据项:数据以数据项的粒度在生产者和消费者间发送和接收。 +- 流:生产者和消费者间不相互感知,通过流进行关联。生产者向流中发送数据,消费者订阅流并从流中接收数据。一个应用中可以有多条流,通过流名称区分。 + +数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是 one-to-one(一个生产者一个消费者)、many-to-one(多个生产者一个消费者)及 one-to-many(一个生产者多个消费者)。 + +数据流接口支持 C++/Java/Python 多语言,本节以 C++ 语言为例介绍流、生产者、消费者、数据项四类接口的使用。 + +## 使用限制 + +- 数据流定位在支持计算过程的中间数据流转,当前无持久化能力。若发送节点故障,则数据会丢失,此时需应用层进行故障处理,比如重启业务等。 +- 当前接口都是同步阻塞,没有类似 epoll 的多路复用能力。 +- 由于通过流解耦了生产者和消费者,生产者和消费者间不感知对方。也就是当生产者关闭时,消费者是无法感知的,需要业务层进行处理。 + +## 流接口 + +流代表了生产者和消费者间的发布订阅交互关系。流随着生产者或消费者的创建而隐式创建,无需应用显式创建流。 + +创建一个生产者或消费者时,需要指定它所关联的流,不同的流通过流名称区分。如果流已经存在,新创建的生产者或消费者会关联到这条流上。如流不存在,在创建生产者或消费者时,系统会隐式创建一个新流,并关联它到指定的流名称。 + +一条流上的生产者和消费者都关闭后,如果未操作接口删除流,流仍然存在,应用可以在这条流上继续关联新的生产者和消费者。您可以在创建生产者时指定流为自动删除 (配置 `autoCleanup` 选项),当该流上所有生产者和消费者都关闭后,流将被系统自动删除。 + +和流相关的操作有三个,创建生产者(可能隐式创建流)、创建消费者(可能隐式创建流)及删除流。 + +- 通过创建生产者产生流:`streamName` 为生产者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。可通过 `ProducerConf` 配置生产者属性或采用默认配置。 + + ```cpp + // 接口原型 + std::shared_ptr CreateProducer(const std::string &streamName, ProducerConf producerConf = {}); + ``` + + ```cpp + // 调用示例 + try { + YR::ProducerConf producerConf{.delayFlushTime = 5, .pageSize = 1024 * 1024ul, .maxStreamSize = 1024 * 1024 * 1024ul}; + std::shared_ptr producer = YR::CreateProducer("streamName", producerConf); + } catch (YR::Exception &e) { + // ....... + } + ``` + +- 通过创建消费者产生流:`streamName` 为消费者关联的流名称,如果对应的流不存在,会隐式创建一个新的流。`SubscriptionConfig` 用于配置消费者行为,`autoAck` 配置开启[自动 ACK](development-data-stream-ack)。 + + ```cpp + // 接口原型 + std::shared_ptr Subscribe(const std::string &streamName, const SubscriptionConfig &config, bool autoAck = false); + ``` + + ```cpp + // 调用示例 + try { + YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe("streamName", config, true); + } catch (YR::Exception &e) { + // ....... + } + ``` + +- 删除流:在全局生产者和消费者个数为 0 时,该数据流不再使用,应用可使用流名称 `streamName` 主动删除流。 + + ```cpp + // 接口原型 + void DeleteStream(const std::string &streamName); + ``` + + ```cpp + // 调用示例 + try { + YR::DeleteStream("streamName"); + } catch (YR::Exception &e) { + // ....... + } + ``` + +## 生产者接口 + +生产者(Producer)可向流中发送数据。 + +- Send 方法:生产者发送数据,数据会先放入缓冲区,系统根据生产者配置的 Flush 策略 (发送间隔一段时间或者缓冲写满) 刷新缓冲使其对消费者可见。 + +- Close 方法:关闭生产者,会触发自动 Flush 数据缓冲。一旦关闭,生产者不再可用。 + + ```cpp + // 接口原型 + void Close(); + ``` + + ```cpp + // 调用示例 + try { + std::shared_ptr producer = YR::CreateProducer("streamName"); + // ....... + producer->Close(); + } catch (YR::Exception &e) { + // ....... + } + ``` + +## 消费者接口 + +消费者(Consumer)可接收流中的数据,使用 `Ack` 方法确认数据接收,以及主动关闭。 + +- Receive 方法:接收数据会等待 `expectNum` 个 `elements`,当到达超时时间 `timeoutMs` 或满足期望个数的数据时,该调用返回。 + + ```cpp + // 接口原型 + void Receive(uint32_t timeoutMs, std::vector &outElements); + void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector &outElements); + + ``` + + ```cpp + // 调用示例 + try { + YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe("streamName", config); + // ...... + std::vector elements; + consumer->Receive(1, 6000, elements); + } catch (YR::Exception &e) { + // ....... + } + ``` + +(development-data-stream-ack)= + +- Ack 方法:消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。 + + 数据流提供了自动 ACK 功能,只需在创建消费者时配置 `autoAck` 为 `true`。应用每次调用 `Receive` 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 `Ack` 方法。 + + :::{note} + + 开启自动 ACK 后,用户需保证消费者每次调用 `Receive` 前,上一次接收到的数据已消费完。调用 `Receive` 后,继续消费上一次接收到的数据系统未定义。 + + ::: + + :::{hint} + + 当消费者调用 `Receive` 方法时,会获取 `Element` 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。 + + ::: + + ```cpp + // 接口原型 + void Ack(uint64_t elementId); + ``` + + ```cpp + // 调用示例 + try { + YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe("streamName", config); + // ...... + consumer->Ack("elementID"); + } catch (YR::Exception &e) { + // ....... + } + + ``` + +- Close 方法:关闭消费者。一旦关闭,消费者不再可用。 + + ```cpp + // 接口原型 + void Close(); + ``` + + ```cpp + // 调用示例 + try { + YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); + std::shared_ptr consumer = YR::Subscribe("streamName", config); + // ....... + consumer->Close(); + } catch (YR::Exception &e) { + // ....... + } + ``` + +## 数据项 + +数据流以数据项为单位发送数据,数据项结构体如下: + +| **字段** | **类型** | **说明** | +| -------- | --------- | -------------- | +| **ptr** | uint8_t * | 指向数据的指针。 | +| **size** | unit64_t | 数据长度。 | +| **id** | uint64_t | 数据项的 id。 | + +`ptr` 字段指向数据的存放地址。在生产者端, `ptr` 字段指向应用的内存空间,因为发送的数据是应用填充的。在消费者端, `ptr` 字段指向的地址空间位于应用函数和数据系统间的共享内存中,这也是为什么会需要 ACK 操作的原因。 diff --git a/docs/multi_language_function_programming_interface/development_guide/index.md b/docs/multi_language_function_programming_interface/development_guide/index.md index 7e62fb092a0928bf2b9673bd238d20aaa23f4f9e..a6fe4e823ad3107ff778b9392612432f9f22e5ad 100644 --- a/docs/multi_language_function_programming_interface/development_guide/index.md +++ b/docs/multi_language_function_programming_interface/development_guide/index.md @@ -9,6 +9,7 @@ stateful_function/index stateless_function/index data_object/index + data_stream/index scheduling/index ``` @@ -19,6 +20,7 @@ openYuanrong 函数开发: - [有状态函数](./stateful_function/index.md) - [无状态函数](./stateless_function/index.md) - [数据对象](./data_object/index.md) +- [数据流](./data_stream/index.md) 调度 openYuanrong 函数: diff --git a/docs/multi_language_function_programming_interface/examples/index.md b/docs/multi_language_function_programming_interface/examples/index.md index 2eceb6b9f89d9c6ba07c418d23cf45d216d99e49..84c9d9aef458d628f75de3ad692f53e892d88ca2 100644 --- a/docs/multi_language_function_programming_interface/examples/index.md +++ b/docs/multi_language_function_programming_interface/examples/index.md @@ -9,5 +9,6 @@ affinity use_NPU_resource streaming-mapreduce + use_stream monte-carlo-pi ``` diff --git a/docs/multi_language_function_programming_interface/examples/use_stream.md b/docs/multi_language_function_programming_interface/examples/use_stream.md new file mode 100644 index 0000000000000000000000000000000000000000..b7f2d3d5a96de3d31b9886cf317d9401f0e7851f --- /dev/null +++ b/docs/multi_language_function_programming_interface/examples/use_stream.md @@ -0,0 +1,71 @@ +# 函数中使用流 + +本节通过简单的 Python 示例介绍如何在函数中使用流。 + +## 准备工作 + +参考[在主机上部署](../../deploy/deploy_processes/index.md)完成openYuanrong部署。 + +## 在无状态函数中使用 + +我们在主程序中创建消费者 `local_consumer`,该操作会隐式完成流 `exp-stream` 的创建。生产者为无状态函数,实例在远端运行。生产者和消费者协商使用字符串 `::END::` 作为流结束标志,处理完流后需要主动调用接口 `yr.delete_stream` 删除流,释放资源。 + +```python +import subprocess +import yr +import time + +@yr.invoke +def send_stream(stream_name, end_marker): + try: + # 创建生产者,配置自动 ACK + # 流发送会进行缓存,对于实时性要求较高的任务,可调低 delay_flush_time 的值,默认 5ms + producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True) + stream_producer = yr.create_stream_producer(stream_name, producer_config) + + corpus = subprocess.check_output(["python", "-c", "import this"]) + lines = corpus.decode().split("\n") + + i = 0 + for line in lines: + if len(line) > 0: + # 发送流 + stream_producer.send(yr.Element(line.encode(), i)) + print("send:" + line) + i += 1 + + # 发送业务约定的结束符号,关闭生产者 + stream_producer.send(yr.Element(end_marker.encode(), i)) + stream_producer.close() + print("stream producer is closed") + except RuntimeError as exp: + print("unexpected exp: ", exp) + + +if __name__ == '__main__': + yr.init() + + stream_name = "exp-stream" + end_marker = "::END::" + # 创建消费者,隐式创建流 + config = yr.SubscriptionConfig("local_consumer") + consumer = yr.create_stream_consumer(stream_name, config) + send_stream.invoke(stream_name, end_marker) + + end = False + while not end: + # 经过 1000ms 或收到 10 个 elements 就返回 + elements = consumer.receive(1000, 10) + for e in elements: + data_str = e.data.decode() + print("receive:" + data_str) + # 收到约定的结束符后,关闭消费者 + if data_str == end_marker: + consumer.close() + print("stream consumer is closed") + end = True + + # 需要显示删除流,否则流一直存在 + yr.delete_stream(stream_name) + yr.finalize() +``` diff --git a/docs/multi_language_function_programming_interface/index.md b/docs/multi_language_function_programming_interface/index.md index 4de5f23c0e0e8f0852ea38d08a84c218d23d6439..f56bb13b9e4d9047165a90d6693d9e9113a0eb5e 100644 --- a/docs/multi_language_function_programming_interface/index.md +++ b/docs/multi_language_function_programming_interface/index.md @@ -137,3 +137,4 @@ print(yr.get(data_ref)) # output {"key": "value"} - [有状态函数](./development_guide/stateful_function/index.md) - [无状态函数](./development_guide/stateless_function/index.md) - [数据对象](./development_guide/data_object/index.md) +- [数据流](./development_guide/data_stream/index.md) diff --git a/docs/multi_language_function_programming_interface/key_concept.md b/docs/multi_language_function_programming_interface/key_concept.md index 133b5c5263f030042a7e2df74543cf7df1375bea..b2a48f51bc1207ad31d6e4c95ea205e19dde8c1d 100644 --- a/docs/multi_language_function_programming_interface/key_concept.md +++ b/docs/multi_language_function_programming_interface/key_concept.md @@ -26,6 +26,8 @@ (key-concept-data-stream)= -## 数据流(即将开源) +## 数据流 数据流是可以在多个 openYuanrong 函数间跨节点分布式传递共享的有序无界内存数据集,支持基于共享内存的高性能 pub/sub 访问,支持一对一、一对多、多对一等多种发布订阅模式。通过数据流可方便地解耦多个不同函数,实现多个函数间异步流式数据传递和计算。 + +查看[数据流开发指南](./development_guide/data_stream/index.md)。