# flume-ng-sql-source **Repository Path**: chenxiaokai_git/flume-ng-sql-source ## Basic Information - **Project Name**: flume-ng-sql-source - **Description**: flume-ng-sql-source oracle - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-08-03 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 修改 https://github.com/keedio/flume-ng-sql-source 支持oracle ======= flume-ng-sql-source ================ This project is used for [flume-ng](https://github.com/apache/flume) to communicate with sql databases Current sql database engines supported ------------------------------- - After the last update the code has been integrated with hibernate, so all databases supported by this technology should work. Compilation and packaging ---------- ``` $ mvn package ``` Deployment ---------- Copy flume-ng-sql-source-.jar in target folder into flume plugins dir folder ``` $ mkdir -p $FLUME_HOME/plugins.d/sql-source/lib $FLUME_HOME/plugins.d/sql-source/libext $ cp flume-ng-sql-source-0.8.jar $FLUME_HOME/plugins.d/sql-source/lib ``` ### Specific installation by database engine ##### MySQL Download the official mysql jdbc driver and copy in libext flume plugins directory: ``` $ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz $ tar xzf mysql-connector-java-5.1.35.tar.gz $ cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/sql-source/libext ``` ##### Microsoft SQLServer Download the official Microsoft 4.1 Sql Server jdbc driver and copy in libext flume plugins directory: Download URL: https://www.microsoft.com/es-es/download/details.aspx?id=11774 ``` $ tar xzf sqljdbc_4.1.5605.100_enu.tar.gz $ cp sqljdbc_4.1/enu/sqljdbc41.jar $FLUME_HOME/plugins.d/sql-source/libext ``` ##### IBM DB2 Download the official IBM DB2 jdbc driver and copy in libext flume plugins directory: Download URL: http://www-01.ibm.com/support/docview.wss?uid=swg21363866 Configuration of SQL Source: ---------- Mandatory properties in bold | Property Name | Default | Description | | ----------------------- | :-----: | :---------- | | channels | - | Connected channel names | | type | - | The component type name, needs to be org.keedio.flume.source.SQLSource | | hibernate.connection.url | - | Url to connect with the remote Database | | hibernate.connection.user | - | Username to connect with the database | | hibernate.connection.password | - | Password to connect with the database | | table | - | Table to export data | | status.file.name | - | Local file name to save last row number read | | status.file.path | /var/lib/flume | Path to save the status file | | start.from | 0 | Start value to import data | | delimiter.entry | , | delimiter of incoming entry | | enclose.by.quotes | true | If Quotes are applied to all values in the output. | | columns.to.select | * | Which colums of the table will be selected | | run.query.delay | 10000 | ms to wait between run queries | | batch.size| 100 | Batch size to send events to flume channel | | max.rows | 10000| Max rows to import per query | | read.only | false| Sets read only session with DDBB | | custom.query | - | Custom query to force a special request to the DB, be carefull. Check below explanation of this property. | | hibernate.connection.driver_class | -| Driver class to use by hibernate, if not specified the framework will auto asign one | | hibernate.dialect | - | Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects | | hibernate.connection.provider_class | - | Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production) | | hibernate.c3p0.min_size | - | Min connection pool size | | hibernate.c3p0.max_size | - | Max connection pool size | | default.charset.resultset | UTF-8 | Result set from DB converted to charset character encoding | Standard Query ------------- If no custom query is set, ```SELECT FROM ``` will be executed each ```run.query.delay``` milliseconds configured Custom Query ------------- A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used. To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted. IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result. Example: ``` agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$ ``` Configuration example -------------------- ```properties # For each one of the sources, the type is defined agent.sources.sqlSource.type = org.keedio.flume.source.SQLSource agent.sources.sqlSource.hibernate.connection.url = jdbc:db2://192.168.56.70:50000/sample # Hibernate Database connection properties agent.sources.sqlSource.hibernate.connection.user = db2inst1 agent.sources.sqlSource.hibernate.connection.password = db2inst1 agent.sources.sqlSource.hibernate.connection.autocommit = true agent.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.DB2Dialect agent.sources.sqlSource.hibernate.connection.driver_class = com.ibm.db2.jcc.DB2Driver #agent.sources.sqlSource.table = employee1 # Columns to import to kafka (default * import entire row) #agent.sources.sqlSource.columns.to.select = * # Query delay, each configured milisecond the query will be sent agent.sources.sqlSource.run.query.delay=10000 # Status file is used to save last readed row agent.sources.sqlSource.status.file.path = /var/log/flume agent.sources.sqlSource.status.file.name = sqlSource.status # Custom query agent.sources.sqlSource.start.from = 19700101000000000000 agent.sources.sqlSource.custom.query = SELECT * FROM (select DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE1.* from employee1 UNION select DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE2.* from employee2) WHERE INCREMENTAL > $@$ ORDER BY INCREMENTAL ASC agent.sources.sqlSource.batch.size = 1000 agent.sources.sqlSource.max.rows = 1000 agent.sources.sqlSource.delimiter.entry = | agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agent.sources.sqlSource.hibernate.c3p0.min_size=1 agent.sources.sqlSource.hibernate.c3p0.max_size=10 # The channel can be defined as follows. agent.sources.sqlSource.channels = memoryChannel ``` Known Issues --------- An issue with Java SQL Types and Hibernate Types could appear Using SQL Server databases and SQL Server Dialect coming with Hibernate. Something like: ``` org.hibernate.MappingException: No Dialect mapping for JDBC type: -15 ``` Use ```org.keedio.flume.source.SQLServerCustomDialect``` in flume configuration file to solve this problem. Special thanks --------------- I used flume-ng-kafka to guide me (https://github.com/baniuyao/flume-ng-kafka-source.git). Thanks to [Frank Yao](https://github.com/baniuyao). Version History --------------- + Version 1.5.1 added charset encoding for result set is now configurable. + Stable version is 1.5.0 (compatible with Apache Flume 1.8.0) + Previous stable version is 1.4.3 (compatible with Apache Flume prior to 1.7.0)