# rocketmq-streams
**Repository Path**: mirrors_apache/rocketmq-streams
## Basic Information
- **Project Name**: rocketmq-streams
- **Description**: Apache rocketmq
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: develop
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 2
- **Created**: 2021-07-13
- **Last Updated**: 2025-09-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RocketMQ Streams
[](https://app.travis-ci.com/apache/rocketmq-streams)
[](https://app.codecov.io/gh/apache/rocketmq-streams)
[](https://github.com/apache/rocketmq-streams/releases)
[](https://www.apache.org/licenses/LICENSE-2.0.html)
[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
**RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.**
It offers a variety of features:
* Function:
* One-to-one transform function, such as: filter, map, foreach
* Aggregate function, such as: sum, min, max, count, aggregate
* Generating function, such as: flatMap
* Group by aggregate and window aggregate
* Join stream
* Custom serialization
----------
## Quick Start
This paragraph guides you running a stream processing with RocketMQ Streams.
### Run RocketMQ 5.0 locally
[RocketMQ quick-start](https://rocketmq.apache.org/docs/quick-start/)
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed.
To check, run `java -version`:
```shell
$ java -version
java version "1.8.0_121"
```
**1) Download RocketMQ**
```shell
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
# Unpack the release
$ unzip rocketmq-all-5.0.0-bin-release.zip
$ cd rocketmq-all-5.0.0-bin-release/bin
```
**2) Start NameServer**
NameServer will be listening at `0.0.0.0:9876`, make sure that the port is not used by others on the local machine, and then do as follows.
```shell
### start Name Server
$ nohup sh mqnamesrv &
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2) Start Broker**
```shell
### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...
```
### Build stream processing application
**1) Build application in IDE**
**2) Add RocketMQ Streams dependency**
```xml
org.apache.rocketmq
rocketmq-streams
{current.version}
```
**3) Build stream processing application**
* create topic in RocketMQ before start the stream processing.
```shell
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
```
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
* add your stream processing code, The following is an example. more examples are [here](./example/src/main/java/org/apache/rocketmq/streams/examples).
```java
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("wordCount");
builder.source("sourceTopic", total -> {
String value = new String(total, StandardCharsets.UTF_8);
return new Pair<>(null, value);
})
.flatMap((ValueMapperAction>) value -> {
String[] splits = value.toLowerCase().split("\\W+");
return Arrays.asList(splits);
})
.keyBy(value -> value)
.count()
.toRStream()
.print();
TopologyBuilder topologyBuilder = builder.build();
Properties properties = new Properties();
properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
@Override
public void run() {
rocketMQStream.stop();
latch.countDown();
}
});
try {
rocketMQStream.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
```