diff --git a/SparkOpOpenGauss/README.md b/SparkOpOpenGauss/README.md new file mode 100644 index 0000000000000000000000000000000000000000..dc247996a90711799edee1de40dcb0c90a651cbe --- /dev/null +++ b/SparkOpOpenGauss/README.md @@ -0,0 +1,27 @@ +# SparkOpOpenGauss + +这是将 openGuass 作为数据源的spark示例代码,./src/main/scala/org.openguass.spark.sources.datasourcev2 + +环境和软件版本要求(前置要求) +1. Java 1.8 +2. Scala 2.12 +3. postgresql.jar 或 opengauss-jdbc--${version}.jar(自己打包或官方提供的jar包) + +运行说明: +1. 请确保服务器上的数据库正常运行,且你的机器可正常连接数据库。 +2. 以sparkuser用户身份执行./resources/school.sql文件。 +3. 请修改代码中sparkuser用户对应的密码、连接数据库的ip及端口,即修改 x.x.x.x:port 。主要代码内容在 `./src/main/scala/`。 +4. 关于代码的构建 + + 首先需要获取 openGauss JDBC 的 jar 包,放入本项目的根目录下的libs目录下,在IDEA中添加jar包为依赖。 + + 其次本项目为 maven 项目,IDEA 会通过 maven自动下载其余依赖。 + + 如要使用从中央仓库中获取 openGauss JDBC 的 jar 包,可取消掉pom中 openGuass JDBC 的依赖。并将JDBC url 由 `jdbc:postgresql://x.x.x.x:port/dbname` 改为 `jdbc:opengauss://x.x.x.x:port/dbname` ,其中x.x.x.x是IP,port是端口号,dbname是数据库的名字。 +5. 可以在IDEA中进入以下示例文件中的方法,单击右键可直接运行本示例。 + + 可运行`./src/test/scala/org/opengauss/spark/OpenGaussExample`。 + + 另一个直接使用Spark JDBC的例子是 `./src/main/scala/SQLDataSourceExample`。 + +代码结构说明: +1. `./src/main/scala/org.opengauss.spark.sources.datasourcev2`包下分别包含了实现Spark Datasource V2 接口的简单数据源的实现示例、简单多数据源的实现示例、流的实现与测试示例与流和批的实现与测试示例。 +2. `./src/main/scala/org.opengauss.spark.sources.opengauss`包下包含了将openGauss作为数据源实现Spark Datasouce V2的实现示例。 +3. `./src/test/scala/org/opengauss/spark/OpenGaussExample`包含了三个测试方法,分别是 简单数据源的使用与测试示例,读和写 openGauss的使用与测试示例。 + +PS: Windows 系统在IDEA中调试 `DataSourceV2StreamAndBatchExample` 与 `DataSourceV2StreamingExample`可能会报错,请从[该链接](https://github.com/steveloughran/winutils/tree/master/hadoop-3.0.0/bin) 中下载 hadoop.dll 和 winutils.exe 文件,并将文件路径添加到环境变量中。 \ No newline at end of file diff --git a/SparkOpOpenGauss/pom.xml b/SparkOpOpenGauss/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..32818d9dae5e701694be818e02bd92d0c8167e28 --- /dev/null +++ b/SparkOpOpenGauss/pom.xml @@ -0,0 +1,79 @@ + + + 4.0.0 + + org.example + SparkOpOG + 3.3.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + + + test + + test + + + + + + + + + + + + org.scalatest + scalatest_2.12 + 3.0.0 + test + + + org.apache.spark + spark-core_2.12 + 3.3.0-SNAPSHOT + + + + org.apache.spark + spark-sql_2.12 + 3.3.0-SNAPSHOT + + + + + + + + + + + + + Apache + https://repository.apache.org/snapshots/ + + true + + + + \ No newline at end of file diff --git a/SparkOpOpenGauss/resources/school.sql b/SparkOpOpenGauss/resources/school.sql new file mode 100644 index 0000000000000000000000000000000000000000..f830b05dc40dbaf3fbc7d6de4f9cc5cabda33bf4 --- /dev/null +++ b/SparkOpOpenGauss/resources/school.sql @@ -0,0 +1,214 @@ +create database school; + +\c school; + +BEGIN; + +-- 创建表student +CREATE TABLE student +( + std_id INT PRIMARY KEY, + std_name VARCHAR(20) NOT NULL, + std_sex VARCHAR(6), + std_birth DATE, + std_in DATE NOT NULL, + std_address VARCHAR(100) +); + +-- 创建表teacher +CREATE TABLE teacher +( + tec_id INT PRIMARY KEY, + tec_name VARCHAR(20) NOT NULL, + tec_job VARCHAR(15), + tec_sex VARCHAR(6), + tec_age INT, + tec_in DATE NOT NULL +); + +-- 创建表class +CREATE TABLE class +( + cla_id INT PRIMARY KEY, + cla_name VARCHAR(20) NOT NULL, + cla_teacher INT NOT NULL +); +-- 给表class添加外键约束 +ALTER TABLE class ADD CONSTRAINT fk_tec_id FOREIGN KEY (cla_teacher) REFERENCES teacher(tec_id) ON DELETE CASCADE; + +-- 创建表school_department +CREATE TABLE school_department +( + depart_id INT PRIMARY KEY, + depart_name VARCHAR(30) NOT NULL, + depart_teacher INT NOT NULL +); +-- 给表school_department添加外键约束 +ALTER TABLE school_department ADD CONSTRAINT fk_depart_tec_id FOREIGN KEY (depart_teacher) REFERENCES teacher(tec_id) ON DELETE CASCADE; + +-- 创建表course +CREATE TABLE course +( + cor_id INT PRIMARY KEY, + cor_name VARCHAR(30) NOT NULL, + cor_type VARCHAR(20), + credit DOUBLE PRECISION +); + +-- 插入数据 +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (1,'张一','男','1993-01-01','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (2,'张二','男','1993-01-02','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (3,'张三','男','1993-01-03','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (4,'张四','男','1993-01-04','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (5,'张五','男','1993-01-05','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (6,'张六','男','1993-01-06','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (7,'张七','男','1993-01-07','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (8,'张八','男','1993-01-08','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (9,'张九','男','1993-01-09','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (10,'李一','男','1993-01-10','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (11,'李二','男','1993-01-11','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (12,'李三','男','1993-01-12','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (13,'李四','男','1993-01-13','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (14,'李五','男','1993-01-14','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (15,'李六','男','1993-01-15','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (16,'李七','男','1993-01-16','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (17,'李八','男','1993-01-17','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (18,'李九','男','1993-01-18','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (19,'王一','男','1993-01-19','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (20,'王二','男','1993-01-20','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (21,'王三','男','1993-01-21','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (22,'王四','男','1993-01-22','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (23,'王五','男','1993-01-23','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (24,'王六','男','1993-01-24','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (25,'王七','男','1993-01-25','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (26,'王八','男','1993-01-26','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (27,'王九','男','1993-01-27','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (28,'钱一','男','1993-01-28','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (29,'钱二','男','1993-01-29','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (30,'钱三','男','1993-01-30','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (31,'钱四','男','1993-02-01','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (32,'钱五','男','1993-02-02','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (33,'钱六','男','1993-02-03','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (34,'钱七','男','1993-02-04','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (35,'钱八','男','1993-02-05','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (36,'钱九','男','1993-02-06','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (37,'吴一','男','1993-02-07','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (38,'吴二','男','1993-02-08','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (39,'吴三','男','1993-02-09','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (40,'吴四','男','1993-02-10','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (41,'吴五','男','1993-02-11','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (42,'吴六','男','1993-02-12','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (43,'吴七','男','1993-02-13','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (44,'吴八','男','1993-02-14','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (45,'吴九','男','1993-02-15','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (46,'柳一','男','1993-02-16','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (47,'柳二','男','1993-02-17','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (48,'柳三','男','1993-02-18','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (49,'柳四','男','1993-02-19','2011-09-01','江苏省南京市雨花台区'); +INSERT INTO student(std_id,std_name,std_sex,std_birth,std_in,std_address) VALUES (50,'柳五','男','1993-02-20','2011-09-01','江苏省南京市雨花台区'); + +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (1,'张一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (2,'张二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (3,'张三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (4,'张四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (5,'张五','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (6,'张六','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (7,'张七','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (8,'张八','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (9,'张九','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (10,'李一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (11,'李二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (12,'李三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (13,'李四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (14,'李五','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (15,'李六','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (16,'李七','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (17,'李八','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (18,'李九','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (19,'王一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (20,'王二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (21,'王三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (22,'王四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (23,'王五','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (24,'王六','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (25,'王七','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (26,'王八','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (27,'王九','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (28,'钱一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (29,'钱二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (30,'钱三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (31,'钱四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (32,'钱五','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (33,'钱六','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (34,'钱七','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (35,'钱八','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (36,'钱九','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (37,'吴一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (38,'吴二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (39,'吴三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (40,'吴四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (41,'吴五','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (42,'吴六','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (43,'吴七','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (44,'吴八','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (45,'吴九','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (46,'柳一','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (47,'柳二','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (48,'柳三','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (49,'柳四','讲师','男',35,'2009-07-01'); +INSERT INTO teacher(tec_id,tec_name,tec_job,tec_sex,tec_age,tec_in) VALUES (50,'柳五','讲师','男',35,'2009-07-01'); + +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (1,'计算机',1); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (2,'自动化',3); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (3,'飞行器设计',5); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (4,'大学物理',7); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (5,'高等数学',9); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (6,'大学化学',12); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (7,'表演',14); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (8,'服装设计',16); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (9,'工业设计',18); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (10,'金融学',21); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (11,'医学',23); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (12,'土木工程',25); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (13,'机械',27); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (14,'建筑学',29); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (15,'经济学',32); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (16,'财务管理',34); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (17,'人力资源',36); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (18,'力学',38); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (19,'人工智能',41); +INSERT INTO class(cla_id,cla_name,cla_teacher) VALUES (20,'会计',45); + +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (1,'计算机学院',2); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (2,'自动化学院',4); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (3,'航空宇航学院',6); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (4,'艺术学院',8); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (5,'理学院',11); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (6,'人工智能学院',13); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (7,'工学院',15); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (8,'管理学院',17); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (9,'农学院',22); +INSERT INTO school_department(depart_id,depart_name,depart_teacher) VALUES (10,'医学院',28); + +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (1,'数据库系统概论','必修',3); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (2,'艺术设计概论','选修',1); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (3,'力学制图','必修',4); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (4,'飞行器设计历史','选修',1); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (5,'马克思主义','必修',2); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (6,'大学历史','必修',2); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (7,'人力资源管理理论','必修',2.5); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (8,'线性代数','必修',4); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (9,'JAVA程序设计','必修',3); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (10,'操作系统','必修',4); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (11,'计算机组成原理','必修',3); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (12,'自动化设计理论','必修',2); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (13,'情绪表演','必修',2.5); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (14,'茶学历史','选修',1); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (15,'艺术论','必修',1.5); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (16,'机器学习','必修',3); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (17,'数据挖掘','选修',2); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (18,'图像识别','必修',3); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (19,'解剖学','必修',4); +INSERT INTO course(cor_id,cor_name,cor_type,credit) VALUES (20,'3D max','选修',2); + +COMMIT; \ No newline at end of file diff --git a/SparkOpOpenGauss/src/main/scala/SQLDataSourceExample.scala b/SparkOpOpenGauss/src/main/scala/SQLDataSourceExample.scala new file mode 100644 index 0000000000000000000000000000000000000000..e7766e0163a879d776b40ae597b22cfda77b58ae --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/SQLDataSourceExample.scala @@ -0,0 +1,69 @@ +package org.apache.spark.examples.sql + +import java.util.Properties + +import org.apache.spark.sql.SparkSession + +object SQLDataSourceExample { + + case class Person(name: String, age: Long) + + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder() + .master("local") + .appName("Spark SQL data sources example") + .config("spark.some.config.option", "some-value") + .getOrCreate() + + runJdbcDatasetExample(spark) + + spark.stop() + } + + + private def runJdbcDatasetExample(spark: SparkSession): Unit = { + // $example on:jdbc_dataset$ + // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods + // Loading data from a JDBC source + val dburl = "jdbc:postgresql://x.x.x.x:port/school" //注意修改此处的ip与端口地址 + val jdbcDF = spark.read + .format("jdbc") + .option("url", dburl) + .option("dbtable", "class") + .option("user", "sparkuser") + .option("password", "pwdofsparkuser") + .load() + + val connectionProperties = new Properties() + connectionProperties.put("user", "sparkuser") + connectionProperties.put("password", "pwdofsparkuser") + val jdbcDF2 = spark.read + .option("customSchema","cla_id INT, cla_teacher STRING") + .jdbc(dburl, "class", connectionProperties) + // Specifying the custom data types of the read schema + + + connectionProperties.put("customSchema", "cla_id INT, cla_name STRING") + val jdbcDF3 = spark.read + .jdbc(dburl, "class", connectionProperties) + + // Saving data to a JDBC source. Create table "customtable1", and write data + jdbcDF.write + .format("jdbc") + .option("url", dburl) + .option("dbtable", "customtable1") + .option("user", "sparkuser") + .option("password", "pwdofsparkuser") + .save() + + + jdbcDF2.write + .jdbc(dburl, "customtable2", connectionProperties) + + // Specifying create table column data types on write + jdbcDF3.write + .option("createTableColumnTypes", "cla_id INT, cla_name VARCHAR(20)") + .jdbc(dburl, "customtable3", connectionProperties) + } +} \ No newline at end of file diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/multi/SimpleMultiDataSource.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/multi/SimpleMultiDataSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..a42c411e45a892d754daf532fb7a3c50d83017f3 --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/multi/SimpleMultiDataSource.scala @@ -0,0 +1,103 @@ +package org.opengauss.spark.sources.datasourcev2.multi + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/* + * Default source should some kind of relation provider + */ +class DefaultSource extends TableProvider { + + override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = + getTable(null, Array.empty[Transform], caseInsensitiveStringMap.asCaseSensitiveMap()).schema() + + override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = + new SimpleBatchTable() + +} + + +/* + Defines Read Support and Initial Schema + */ + +class SimpleBatchTable extends Table with SupportsRead { + override def name(): String = this.getClass.toString + + override def schema(): StructType = StructType(Array(StructField("value", StringType))) + + override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new SimpleScanBuilder() +} + + +/* + Scan object with no mixins + */ +class SimpleScanBuilder extends ScanBuilder { + override def build(): Scan = new SimpleScan +} + +/* + Batch Reading Support + + The schema is repeated here as it can change after column pruning etc + */ + +class SimpleScan extends Scan with Batch { + override def readSchema(): StructType = StructType(Array(StructField("value", StringType))) + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + Array(new SimplePartition(0, 4), + new SimplePartition(5, 9)) + } + + override def createReaderFactory(): PartitionReaderFactory = new SimplePartitionReaderFactory() +} + +// simple class to organise the partition +class SimplePartition(val start: Int, val end: Int) extends InputPartition + +// reader factory +class SimplePartitionReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = new + SimplePartitionReader(partition.asInstanceOf[SimplePartition]) +} + + +// parathion reader +class SimplePartitionReader(inputPartition: SimplePartition) extends PartitionReader[InternalRow] { + + val values = Array("1", "2", "3", "4", "5", "6", "7", "8", "9", "10") + + var index = inputPartition.start + + def next = index <= inputPartition.end + + def get = { + val stringValue = values(index) + val stringUtf = UTF8String.fromString(stringValue) + val row = InternalRow(stringUtf) + index = index + 1 + row + } + + def close() = Unit + +} + + + + diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/simple/SimpleDataSource.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/simple/SimpleDataSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..0b8e08163e8e6bfb33596155ec933069b0c0292e --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/simple/SimpleDataSource.scala @@ -0,0 +1,100 @@ +package org.opengauss.spark.sources.datasourcev2.simple + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/* + * Default source should some kind of relation provider + */ +class DefaultSource extends TableProvider{ + + override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = + getTable(null,Array.empty[Transform],caseInsensitiveStringMap.asCaseSensitiveMap()).schema() + + override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = + new SimpleBatchTable() +} + + +/* + Defines Read Support and Initial Schema + */ + +class SimpleBatchTable extends Table with SupportsRead { + override def name(): String = this.getClass.toString + + override def schema(): StructType = StructType(Array(StructField("value", StringType))) + + override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new SimpleScanBuilder() +} + + + +/* + Scan object with no mixins + */ +class SimpleScanBuilder extends ScanBuilder { + override def build(): Scan = new SimpleScan +} + +/* + Batch Reading Support + + The schema is repeated here as it can change after column pruning etc + */ + +class SimpleScan extends Scan with Batch{ + override def readSchema(): StructType = StructType(Array(StructField("value", StringType))) + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + Array(new SimplePartition()) + } + override def createReaderFactory(): PartitionReaderFactory = new SimplePartitionReaderFactory() +} + +// simple class to organise the partition +class SimplePartition extends InputPartition + +// reader factory +class SimplePartitionReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = new SimplePartitionReader +} + + +// parathion reader +class SimplePartitionReader extends PartitionReader[InternalRow] { + + val values = Array("1", "2", "3", "4", "5") + + var index = 0 + + def next = index < values.length + + def get = { + val stringValue = values(index) + val stringUtf = UTF8String.fromString(stringValue) + val row = InternalRow(stringUtf) + index = index + 1 + row + } + + def close() = Unit + +} + + + + diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/DataSourceV2StreamAndBatchExample.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/DataSourceV2StreamAndBatchExample.scala new file mode 100644 index 0000000000000000000000000000000000000000..a0b42a04cfb52844c942e91de443fe71f36ff742 --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/DataSourceV2StreamAndBatchExample.scala @@ -0,0 +1,37 @@ +package org.opengauss.spark.sources.datasourcev2.streamandbatch + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.OutputMode + +object DataSourceV2StreamAndBatchExample { + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder. + master("local[2]") + .appName("streaming example") + .getOrCreate() + + + val dataSource = "org.opengauss.spark.sources.datasourcev2.streamandbatch" + + val batchDf = sparkSession + .read + .format(dataSource) + .load() + + batchDf.show() + + val streamingDf = sparkSession. + readStream. + format(dataSource) + .load() + + val query = streamingDf.writeStream + .format("console") + .queryName("simple_source") + .outputMode(OutputMode.Append()) + + query.start().awaitTermination() + + } + +} diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/SimpleStreamAndBatchDataSource.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/SimpleStreamAndBatchDataSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..0fe5d52f2bf658809c11412286725a557dbcb17a --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streamandbatch/SimpleStreamAndBatchDataSource.scala @@ -0,0 +1,130 @@ +package org.opengauss.spark.sources.datasourcev2.streamandbatch + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/* + * Default source should some kind of relation provider + */ +class DefaultSource extends TableProvider{ + + override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = + getTable(null,Array.empty[Transform],caseInsensitiveStringMap.asCaseSensitiveMap()).schema() + + override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = + new SimpleStreamingTable() +} + + +/* + Defines Read Support and Initial Schema + */ + +class SimpleStreamingTable extends Table with SupportsRead { + override def name(): String = this.getClass.toString + + override def schema(): StructType = StructType(Array(StructField("value", StringType))) + + override def capabilities(): util.Set[TableCapability] = Set(TableCapability.MICRO_BATCH_READ, + TableCapability.BATCH_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new SimpleScanBuilder() +} + + +/* + Scan object with no mixins + */ +class SimpleScanBuilder extends ScanBuilder { + override def build(): Scan = new SimpleScan +} + +/* + Batch Reading Support + + The schema is repeated here as it can change after column pruning etc + */ + +class SimpleScan extends Scan{ + override def readSchema(): StructType = StructType(Array(StructField("value", StringType))) + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new SimpleMicroBatchStream() + + override def toBatch: Batch = new SimpleBatch +} + +class SimpleBatch extends Batch{ + override def planInputPartitions(): Array[InputPartition] = Array(new SimplePartition) + + override def createReaderFactory(): PartitionReaderFactory = new SimplePartitionReaderFactory +} + +class SimpleOffset(value:Int) extends Offset { + override def json(): String = s"""{"value":"$value"}""" +} + +class SimpleMicroBatchStream extends MicroBatchStream { + var latestOffsetValue = 0 + + override def latestOffset(): Offset = { + latestOffsetValue += 10 + new SimpleOffset(latestOffsetValue) + } + + override def planInputPartitions(offset: Offset, offset1: Offset): Array[InputPartition] = Array(new SimplePartition) + + override def createReaderFactory(): PartitionReaderFactory = new SimplePartitionReaderFactory() + + override def initialOffset(): Offset = new SimpleOffset(latestOffsetValue) + + override def deserializeOffset(s: String): Offset = new SimpleOffset(latestOffsetValue) + + override def commit(offset: Offset): Unit = {} + + override def stop(): Unit = {} +} + + +// simple class to organise the partition +class SimplePartition extends InputPartition + +// reader factory +class SimplePartitionReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = new SimplePartitionReader +} + + +// parathion reader +class SimplePartitionReader extends PartitionReader[InternalRow] { + + val values = Array("1", "2", "3", "4", "5") + + var index = 0 + + def next = index < values.length + + def get = { + val stringValue = values(index) + val stringUtf = UTF8String.fromString(stringValue) + val row = InternalRow(stringUtf) + index = index + 1 + row + } + + def close() = Unit + +} + + + + diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/DataSourceV2StreamingExample.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/DataSourceV2StreamingExample.scala new file mode 100644 index 0000000000000000000000000000000000000000..abd16c26508f476f9f3c56a70d2f71b2cfff5a32 --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/DataSourceV2StreamingExample.scala @@ -0,0 +1,29 @@ +package org.opengauss.spark.sources.datasourcev2.streaming + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.OutputMode + +object DataSourceV2StreamingExample { + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder. + master("local[2]") + .appName("streaming example") + .getOrCreate() + + val streamingDf = sparkSession. + readStream. + format("org.opengauss.spark.sources.datasourcev2.streaming") + .load() + + + + val query = streamingDf.writeStream + .format("console") + .queryName("simple_source") + .outputMode(OutputMode.Append()) + + query.start().awaitTermination() + + } + +} diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/SimpleStreamingDataSource.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/SimpleStreamingDataSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..89a37b8d819bcef12f63d2b0722b2477dc65043b --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/datasourcev2/streaming/SimpleStreamingDataSource.scala @@ -0,0 +1,121 @@ +package org.opengauss.spark.sources.datasourcev2.streaming + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/* + * Default source should some kind of relation provider + */ +class DefaultSource extends TableProvider{ + + override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = + getTable(null,Array.empty[Transform],caseInsensitiveStringMap.asCaseSensitiveMap()).schema() + + override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = + new SimpleStreamingTable() +} + + +/* + Defines Read Support and Initial Schema + */ + +class SimpleStreamingTable extends Table with SupportsRead { + override def name(): String = this.getClass.toString + + override def schema(): StructType = StructType(Array(StructField("value", StringType))) + + override def capabilities(): util.Set[TableCapability] = Set(TableCapability.MICRO_BATCH_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new SimpleScanBuilder() +} + + +/* + Scan object with no mixins + */ +class SimpleScanBuilder extends ScanBuilder { + override def build(): Scan = new SimpleScan +} + +/* + Batch Reading Support + + The schema is repeated here as it can change after column pruning etc + */ + +class SimpleScan extends Scan{ + override def readSchema(): StructType = StructType(Array(StructField("value", StringType))) + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new SimpleMicroBatchStream() +} + +class SimpleOffset(value:Int) extends Offset { + override def json(): String = s"""{"value":"$value"}""" +} + +class SimpleMicroBatchStream extends MicroBatchStream { + var latestOffsetValue = 0 + + override def latestOffset(): Offset = { + latestOffsetValue += 10 + new SimpleOffset(latestOffsetValue) + } + + override def planInputPartitions(offset: Offset, offset1: Offset): Array[InputPartition] = Array(new SimplePartition) + + override def createReaderFactory(): PartitionReaderFactory = new SimplePartitionReaderFactory() + + override def initialOffset(): Offset = new SimpleOffset(latestOffsetValue) + + override def deserializeOffset(s: String): Offset = new SimpleOffset(latestOffsetValue) + + override def commit(offset: Offset): Unit = {} + + override def stop(): Unit = {} +} + + +// simple class to organise the partition +class SimplePartition extends InputPartition + +// reader factory +class SimplePartitionReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = new SimplePartitionReader +} + + +// parathion reader +class SimplePartitionReader extends PartitionReader[InternalRow] { + + val values = Array("1", "2", "3", "4", "5") + + var index = 0 + + def next = index < values.length + + def get = { + val stringValue = values(index) + val stringUtf = UTF8String.fromString(stringValue) + val row = InternalRow(stringUtf) + index = index + 1 + row + } + + def close() = Unit + +} + + + + diff --git a/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/opengauss/OpenGaussDataSource.scala b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/opengauss/OpenGaussDataSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..25dc02466c5dbfb4a61b595b45910a1189fb2bd2 --- /dev/null +++ b/SparkOpOpenGauss/src/main/scala/org/opengauss/spark/sources/opengauss/OpenGaussDataSource.scala @@ -0,0 +1,151 @@ +package org.opengauss.spark.sources.opengauss + +import java.sql.DriverManager +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +class DefaultSource extends TableProvider { + override def inferSchema(options: CaseInsensitiveStringMap): StructType = OpenGaussTable.schema + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String] + ): Table = new OpenGaussTable(properties.get("tableName")) // TODO: Error handling +} + +class OpenGaussTable(val name: String) extends SupportsRead with SupportsWrite { + override def schema(): StructType = OpenGaussTable.schema + + override def capabilities(): util.Set[TableCapability] = Set( + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE + ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new OpenGaussScanBuilder(options) + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = new OpenGaussWriteBuilder(info.options) +} + +object OpenGaussTable { + /*Table products*/ + /*Database school, table course*/ + val schema: StructType = new StructType().add("cor_id", IntegerType).add("cor_name", StringType).add("cor_type", StringType).add("credit", DoubleType) +} + +case class ConnectionProperties(url: String, user: String, password: String, tableName: String, partitionColumn: String, partitionSize: Int) + + + +/** Read */ + +class OpenGaussScanBuilder(options: CaseInsensitiveStringMap) extends ScanBuilder { + override def build(): Scan = new OpenGaussScan(ConnectionProperties( + options.get("url"), options.get("user"), options.get("password"), options.get("tableName"), options.get("partitionColumn"), options.get("partitionSize").toInt + )) +} + +class OpenGaussPartition extends InputPartition + +class OpenGaussScan(connectionProperties: ConnectionProperties) extends Scan with Batch { + override def readSchema(): StructType = OpenGaussTable.schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = Array(new OpenGaussPartition) + + override def createReaderFactory(): PartitionReaderFactory = new OpenGaussPartitionReaderFactory(connectionProperties) +} + +class OpenGaussPartitionReaderFactory(connectionProperties: ConnectionProperties) + extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = new OpenGaussPartitionReader(connectionProperties) +} + + + +class OpenGaussPartitionReader(connectionProperties: ConnectionProperties) extends PartitionReader[InternalRow] { + private val connection = DriverManager.getConnection( + connectionProperties.url, connectionProperties.user, connectionProperties.password + ) + private val statement = connection.createStatement() + private val resultSet = statement.executeQuery(s"select * from ${connectionProperties.tableName}") + + override def next(): Boolean = resultSet.next() + + override def get(): InternalRow = InternalRow( + resultSet.getInt(1), + UTF8String.fromString(resultSet.getString(2)), + UTF8String.fromString(resultSet.getString(3)), + resultSet.getDouble(4)) + + override def close(): Unit = connection.close() + +} + + + +/** Write */ + +class OpenGaussWriteBuilder(options: CaseInsensitiveStringMap) extends WriteBuilder { + override def buildForBatch(): BatchWrite = new OpenGaussBatchWrite(ConnectionProperties( + options.get("url"), options.get("user"), options.get("password"), options.get("tableName"), options.get("partitionColumn"), options.get("partitionSize").toInt + )) +} + +class OpenGaussBatchWrite(connectionProperties: ConnectionProperties) extends BatchWrite { + override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): DataWriterFactory = + new OpenGaussDataWriterFactory(connectionProperties) + + override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = {} + + override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = {} +} + +class OpenGaussDataWriterFactory(connectionProperties: ConnectionProperties) extends DataWriterFactory { + override def createWriter(partitionId: Int, taskId:Long): DataWriter[InternalRow] = + new OpenGaussWriter(connectionProperties) +} + +object WriteSucceeded extends WriterCommitMessage + +class OpenGaussWriter(connectionProperties: ConnectionProperties) extends DataWriter[InternalRow] { + + val connection = DriverManager.getConnection( + connectionProperties.url, + connectionProperties.user, + connectionProperties.password + ) + + val statement = "insert into ${connectionProperties.tableName} (cla_id, cla_name, cla_teacher) values (?,?,?)" + val preparedStatement = connection.prepareStatement(statement) + + override def write(record: InternalRow): Unit = { + val cla_id = record.getInt(0) + val cla_name = record.getString(1) + val cla_teacher = record.getInt(2) + + preparedStatement.setInt(0, cla_id) + preparedStatement.setString(1, cla_name) + preparedStatement.setInt(2, cla_teacher) + preparedStatement.executeUpdate() + } + + override def commit(): WriterCommitMessage = WriteSucceeded + + override def abort(): Unit = {} + + override def close(): Unit = connection.close() +} + + diff --git a/SparkOpOpenGauss/src/test/scala/org/opengauss/spark/OpenGaussExample.scala b/SparkOpOpenGauss/src/test/scala/org/opengauss/spark/OpenGaussExample.scala new file mode 100644 index 0000000000000000000000000000000000000000..0f7270db973c0e5b34b4d10c7a804d81cbdf4009 --- /dev/null +++ b/SparkOpOpenGauss/src/test/scala/org/opengauss/spark/OpenGaussExample.scala @@ -0,0 +1,79 @@ +package org.opengauss.spark + +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.scalatest.FlatSpec +import java.sql.DriverManager +import java.util.Properties + +import org.scalatest.Matchers.convertToAnyShouldWrapper + +class OpenGaussExample extends FlatSpec { + + val testTableName = "course" + val dburl = "jdbc:postgresql://x.x.x.x:port/school" //注意将此处修改成你的机器对应的的ip与端口 + + "Simple data source" should "read" in{ + val sparkSession = SparkSession.builder + .master("local[2]") + .appName("example") + .getOrCreate() + + val simpleDf = sparkSession.read + .format("org.opengauss.spark.sources.datasourcev2.simple") + .load() + + simpleDf.show() + println( + "number of partitions in simple source is " + simpleDf.rdd.getNumPartitions) + } + + + "openGauss data source" should "read table" in { + val spark = SparkSession + .builder() + .master("local[*]") + .appName("OpenGaussReaderJob") + .getOrCreate() + + val simpleRead = spark + .read + .format("org.opengauss.spark.sources.opengauss") + .option("url", dburl) + .option("user", "sparkuser") + .option("password", "pwdofsparkuser") + .option("tableName", testTableName) + .option("partitionSize", 10) + .load() + .show() + + spark.stop() + } + + "openGauss data source" should "write table" in { + val spark = SparkSession + .builder() + .master("local[*]") + .appName("OpenGaussWriterJob") + .getOrCreate() + + import spark.implicits._ + + val df = (60 to 70).map(_.toLong).toDF("cla_id") + + df + .write + .format("org.opengauss.spark.sources.opengauss") + .option("url", dburl) + .option("user", "sparkuser") + .option("password", "pwdofsparkuser") + .option("tableName", testTableName) + .option("partitionSize", 10) + .option("partitionColumn", "cla_id") + .mode(SaveMode.Append) + .save() + + spark.stop() + } + +} +