# flink-streaming-udf **Repository Path**: cloudcoder/flink-streaming-udf ## Basic Information - **Project Name**: flink-streaming-udf - **Description**: 基于flink的 udf 函数工程 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-09-17 - **Last Updated**: 2023-06-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-streaming-udf 一个基于flink的用户自定义函数工程 用于学习以下自定义函数: - Scalar functions map scalar values to a new scalar value. - Table functions map scalar values to new rows. - Aggregate functions map scalar values of multiple rows to a new scalar value. - Table aggregate functions map scalar values of multiple rows to new rows. - Async table functions are special functions for table sources that perform a lookup. [官网udfs介绍](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/) [内置的自定义函数 systemfunctions](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/#aggregate-functions) # connector jdbc扩展 ## clickhouse的扩展 注意: 因为AbstractJdbcRowConverter是package可见的,所以继承此类的RowConverter需要放在相同的package下 - 1. 参照MySQLDialect写ClickhouseJDBCDialect, - 2. 在JdbcDialects类的DIALECTS列表中增加ClickhouseJDBCDialect - 3. 编写ClickhouseRowConverter: 注意Click会将UInt64作为BigInteger类型,将UInt32作为Long类型,所以需要重写createInternalConverter方法 - 4. 将编译后的JdbcDialects放到flink-connector-jdbc_2.11-1.13.2.jar中并替换flink中的此jar - 5. 将编写的*Dialect、*RowConverter打包,放在flink任务的扩展jar目录下 测试如下: ```sql DROP TABLE test; CREATE TABLE IF NOT EXISTS test ( INFO_ID String, DEVICE_ID String, SHOT_TIME UInt64, SHOT_DATE Nullable(UInt32) )ENGINE = MergeTree() PARTITION BY (toInt32(SHOT_TIME/1000000)) ORDER BY (DEVICE_ID,SHOT_TIME) SETTINGS index_granularity = 8192; INSERT INTO test VALUES('1','10000001',20210918150000,20210918),('1','10000001',20210918150001,20210918),('1','10000001',20210918150002,20210918); ``` sql-client ```sql CREATE TABLE test ( INFO_ID STRING, DEVICE_ID STRING, SHOT_TIME BIGINT, SHOT_DATE INT, PRIMARY KEY (INFO_ID) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhosue://172.25.21.16:8123/test?socket_timeout=3000000', 'table-name' = 'test', 'username' = 'suntek', 'password' = '123456' ); select * from test limit 2; select SHOT_DATE,COUNT(1) AS TOTAL FROM test where SHOT_TIME BETWEEN 20210901000000 AND 20210930000000 group by SHOT_DATE; INSERT INTO test VALUES('1','10000001',20210921150000,20210921),('1','10000001',20210921150001,20210921),('1','10000001',20210921150002,20210921); ```