# aliyun-cupid-sdk **Repository Path**: aliyun/aliyun-cupid-sdk ## Basic Information - **Project Name**: aliyun-cupid-sdk - **Description**: SDK for open source framwork to interact with MaxCompute - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-05-08 - **Last Updated**: 2025-06-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README * [1. 开发环境准备](#1) * [1.1 下载并解压spark包](#1.1) * [1.2 设置环境变量](#1.2) * [1.3 设置Spark-defaults.conf](#1.3) * [2. 获取并编译样例模板](#2) * [3. Maven依赖](#3) * [4. 常用接口及运行应用](#4) * [4.1 Spark Shell](#4.1) * [4.2 Spark R](#4.2) * [4.3 Spark SQL](#4.3) * [4.4 Spark JDBC](#4.4) * [5. 常用场景案例演示](#5) * [5.1 WordCount案例](#5.1) * [5.2 访问OSS案例](#5.2) * [5.3 MaxCompute Table ReadWrite案例](#5.3) * [5.3.1 读非分区表](#5.3.1) * [5.3.2 读单列分区表](#5.3.2) * [5.3.3 读多列分区表](#5.3.3) * [5.3.4 指定多个分区(每个分区多列)读表](#5.3.4) * [5.3.5 存储RDD至非分区表](#5.3.5) * [5.3.6 指定具体分区存储RDD至分区表](#5.3.6) * [5.3.7 动态指定分区存储RDD至分区表](#5.3.7) * [5.4 MaxCompute Table Spark-SQL案例](#5.4) * [5.5 MaxCompute自研Client模式案例](#5.5) * [5.6 MaxCompute Table PySpark案例](#5.6) * [5.7 Mllib案例](#5.7) * [5.8 pyspark交互式案例](#5.8) * [5.9 spark-shell交互式案例(读表)](#5.9) * [5.10 spark-shell交互式案例(mlib+OSS读写)](#5.10) * [5.11 sparkR交互式案例](#5.11) * [5.12 GraphX--PageRank案例](#5.12) * [5.13 Spark Streaming--NetworkWordCount案例](#5.13) * [6. 特殊说明](#6) * [6.1 Spark Streaming任务特殊配置](#6.1) * [6.2 Tracking Url说明](#6.2) * [7. 相关参考](#7) * [8. License](#8)

1 开发环境准备

**!!!注意!!!** master分支的文档以及代码仅支持MaxCompute AliSpark **专有云输出** [MaxCompute AliSpark公共云入口](https://github.com/aliyun/MaxCompute-Spark/wiki)

1.1 下载并解压spark包

1. 下载[Spark on MaxCompute](http://repo.aliyun.com/download/spark-2.1.0-private-cloud-v3.5.0-20190427.tar.gz)安装包。 2. 对下载好的Spark on MaxCompute最新发布包进行解压,解压后的文件夹结构如下。 ``` . |-- RELEASE |-- __spark_libs__.zip |-- bin |-- conf |-- cupid |-- derby.log |-- examples |-- jars |-- logs |-- metastore_db |-- python |-- sbin |-- yarn ```

1.2 设置环境变量

JAVA_HOME设置 ``` export JAVA_HOME=/path/to/jdk export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin:$PATH ``` SPARK_HOME设置 ``` export SPARK_HOME=/path/to/spark_extracted_package export PATH=$SPARK_HOME/bin:$PATH ``` 对于SparkR用户,需要安装R至/home/admin/R目录,并设置path: ``` export PATH=/home/admin/R/bin/:$PATH ``` 对于pyspark用户,选装python2.7,并设置path: ``` export PATH=/path/to/python/bin/:$PATH ```

1.3 设置Spark-defaults.conf

在$SPARK_HOME/conf路径下存在spark-defaults.conf文件,需要在该文件中设置MaxCompute相关的账号信息后,才可以提交Spark任务到MaxCompute。默认配置内容如下,将空白部分根据实际的账号信息填上即可。 ``` # OdpsAccount Info Setting spark.hadoop.odps.project.name= spark.hadoop.odps.access.id= spark.hadoop.odps.access.key= spark.hadoop.odps.end.point= #spark.hadoop.odps.moye.trackurl.host= #spark.hadoop.odps.cupid.webproxy.endpoint= spark.sql.catalogImplementation=odps # spark-shell Setting spark.driver.extraJavaOptions -Dscala.repl.reader=com.aliyun.odps.spark_repl.OdpsIntera ctiveReader -Dscala.usejavacp=true # SparkR Setting # odps.cupid.spark.r.archive=/path/to/R-PreCompile-Package.zip # Cupid Longtime Job # spark.hadoop.odps.cupid.engine.running.type=longtime # spark.hadoop.odps.cupid.job.capability.duration.hours=8640 # spark.hadoop.odps.moye.trackurl.dutation=8640 # spark.r.command=/home/admin/R/bin/Rscript # spark.hadoop.odps.cupid.disk.driver.enable=false spark.hadoop.odps.cupid.bearer.token.enable=false spark.hadoop.odps.exec.dynamic.partition.mode=nonstrict ```

2 获取并编译样例模板

执行如下命令,获取并编译样例模板spark-example: ``` git clone https://github.com/aliyun/aliyun-cupid-sdk.git cd aliyun-cupid-sdk mvn -T 1C clean install -DskipTests ```

3 Maven依赖

上述章节中的GitHub项目,即spark-example可以作为用户QuickStart开发的模版,如果用户需要自定义开发,请确认pom.xml文件如下所示。spark的模块的版本使用社区2.1.0的版本即可,并且保证scope是provided。 ``` org.apache.spark spark-core_2.11 2.1.0 provided ``` MaxCompute插件已经发布到Maven仓库,需添加以下依赖。 ``` com.aliyun.odps cupid-core_2.11 1.0.0 provided com.aliyun.odps cupid-datasource_2.11 1.0.0 com.aliyun.odps cupid-client_2.11 1.0.0 ``` maven库的依赖列表如下所示: - Cupid平台core代码,包括cupid task提交接口封装,以及父子进程读写表相关接口。 ``` com.aliyun.odps cupid-core_2.11 1.0.0 ``` - datasource,封装了spark相关的MaxCompute Table读写的用户接口。 ``` com.aliyun.odps cupid-datasource_2.11 1.0.0 ``` - 封装了Cupid Client模式的用户SDK。 ``` com.aliyun.odps cupid-client_2.11 1.0.0 ```

4 常用接口及运行应用

Spark目前支持spark shell、spark r、spark sql及spark jdbc等四种接口向集群提交任务,具体的运行应用如下。

4.1 Spark Shell

执行如下命令,启动应用 ``` $cd $SPARK_HOME $bin/spark-shell --master yarn ``` 简单应用示例如下: ``` sc.parallelize(0 to 100, 2).collect sql("show tables").show sql("select * from spark_user_data").show(200,100) ```

4.2 Spark R

执行如下命令,启动应用。 ``` // 创建一个目录R并解压该目录下的R.zip $mkdir -p /home/admin/R && unzip ./R/R.zip -d /home/admin/R/ // 设置环境变量 $export PATH=/home/admin/R/bin/:$PATH // 选择运行模式并启动应用 $bin/sparkR --master yarn --archives ./R/R.zip ``` 简单应用示例如下: ``` df <- as.DataFrame(faithful) df head(select(df, df$eruptions)) head(select(df, "eruptions")) head(filter(df, df$waiting < 50)) results <- sql("FROM spark_user_data SELECT *") head(results) ```

4.3 Spark SQL

执行如下命令,启动应用。 ``` // 进入spark目录 $cd $SPARK_HOME // 选择运行模式并启动应用 $bin/spark-sql --master yarn ``` 应用示例如下: ``` show tables; select * from spark_user_data limit 3; quit; ```

4.4 Spark JDBC

执行如下命令,启动应用。 ``` // 停止线程 $sbin/stop-thriftserver.sh // 重启线程 $sbin/start-thriftserver.sh // 启动应用 $bin/beeline ``` 简单应用示例如下: ``` !connect jdbc:hive2://localhost:10000/odps_smoke_test show tables; select * from mr_input limit 3; !quit ```

5 常用场景案例演示

本章节将给出一些常用场景的案例演示,帮助用户更快的了解Spark on MaxCompute的相关使用。 用户需要下载GitHub项目并对项目进行编译后,才能够运行相关Demo。 ``` // 下载GitHub项目 git clone https://github.com/aliyun/aliyun-cupid-sdk.git cd aliyun-cupid-sdk // 编译GitHub项目 mvn -T 1C clean install -DskipTests ``` 执行完上述步骤后,会产生相应的jar包,这些jar包会在下面具体的案例演示Demo中使用。

5.1 WordCount案例

Spark运行简单的WordCount。 案例代码如下。 ``` package com.aliyun.odps.spark.examples import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("WordCount") .getOrCreate() val sc = spark.sparkContext try { sc.parallelize(1 to 100, 10).map(word => (word, 1)).reduceByKey(_ + _, 10).take(100). foreach(println) } finally { sc.stop() } } } ``` 提交作业的运行命令如下。 ``` bin/spark-submit \ --master yarn-cluster \ --class com.aliyun.odps.spark.examples.WordCount \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ```

5.2 访问OSS案例

Spark可以访问OSS数据,示例如下。 ``` package com.aliyun.odps.spark.examples.oss import org.apache.spark.sql.SparkSession object SparkUnstructuredDataCompute { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkUnstructuredDataCompute") .config("spark.hadoop.fs.oss.accessKeyId", "***") .config("spark.hadoop.fs.oss.accessKeySecret", "***") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .getOrCreate() val sc = spark.sparkContext try { val pathIn = "oss://bucket/inputdata/" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count println(s"count: $cnt") } finally { sc.stop() } } } ``` 提交作业的运行命令如下 ``` ./bin/spark-submit \ --jars cupid/hadoop-aliyun-package-3.0.0-alpha2-odps-jar-with-dependencies.jar \ --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ```

5.3 MaxCompute Table ReadWrite案例

读写MaxCompute Table,转化为Spark RDD。 ``` ⚠️注意: 案例Demo中的Project/Table必须存在,或者更改为对应的Project/Table。 ``` 案例代码如下。

5.3.1 读非分区表

java API: ``` /** * read from normal table via rdd api * desc cupid_wordcount; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); JavaRDD> rdd_0 = javaOdpsOps.readTable( projectName, "cupid_wordcount", new Function2>() { public Tuple2 call(Record v1, TableSchema v2) throws Exception { return new Tuple2(v1.getString(0), v1.getString(1)); } }, 0 ); ``` scala API: ``` /** * read from normal table via rdd api * desc cupid_wordcount; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val rdd_0 = odpsOps.readTable( projectName, "cupid_wordcount", (r: Record, schema: TableSchema) => (r.getString(0), r.getString(1)) ) ```

5.3.2 读单列分区表

java API: ``` /** * read from single partition column table via rdd api * desc dftest_single_parted; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt | string | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); JavaRDD> rdd_1 = javaOdpsOps.readTable( projectName, "dftest_single_parted", "pt=20160101", new Function2>() { public Tuple3 call(Record v1, TableSchema v2) throws Exception { return new Tuple3(v1.getString(0), v1.getString(1), v1.getString("pt")); } }, 0 ); ``` scala API: ``` /** * read from single partition column table via rdd api * desc dftest_single_parted; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt | string | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val rdd_1 = odpsOps.readTable( projectName, "dftest_single_parted", Array("pt=20160101"), (r: Record, schema: TableSchema) => (r.getString(0), r.getString(1), r.getString("pt")) ) ```

5.3.3 读多列分区表

java API: ``` /** * read from multi partition column table via rdd api * desc dftest_parted; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt | string | | * | hour | string | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); JavaRDD> rdd_2 = javaOdpsOps.readTable( projectName, "dftest_parted", "pt=20160101,hour=12", new Function2>() { public Tuple4 call(Record v1, TableSchema v2) throws Exception { return new Tuple4(v1.getString(0), v1.getString(1), v1.getString("pt"), v1.getString(3)); } }, 0 ); ``` scala API: ``` /** * read from multi partition column table via rdd api * desc dftest_parted; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt | string | | * | hour | string | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val rdd_2 = odpsOps.readTable( projectName, "dftest_parted", Array("pt=20160101,hour=12"), (r: Record, schema: TableSchema) => (r.getString(0), r.getString(1), r.getString("pt"), r.getString(3)) ) ```

5.3.4 指定多个分区(每个分区多列)读表

java API: ``` /** * read with multi partitionSpec definition via rdd api * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); JavaRDD> rdd_3 = javaOdpsOps.readTable( projectName, "cupid_partition_table1", new String[]{"pt1=part1,pt2=part1", "pt1=part1,pt2=part2", "pt1=part2,pt2=part3"}, new Function2>() { public Tuple4 call(Record v1, TableSchema v2) throws Exception { return new Tuple4(v1.getString(0), v1.getString(1), v1.getString("pt1"), v1.getString("pt2")); } }, 0 ); ``` scala API: ``` /** * read with multi partitionSpec definition via rdd api * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val rdd_3 = odpsOps.readTable( projectName, "cupid_partition_table1", Array("pt1=part1,pt2=part1", "pt1=part1,pt2=part2", "pt1=part2,pt2=part3"), (r: Record, schema: TableSchema) => (r.getString(0), r.getString(1), r.getString("pt1"), r.getString("pt2")) ) ```

5.3.5 存储RDD至非分区表

java API: ``` /** * save rdd into normal table * desc cupid_wordcount_empty; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); VoidFunction3, Record, TableSchema> transfer_0 = new VoidFunction3, Record, TableSchema>() { @Override public void call(Tuple2 v1, Record v2, TableSchema v3) throws Exception { v2.set("id", v1._1()); v2.set(1, v1._2()); } }; javaOdpsOps.saveToTable(projectName, "cupid_wordcount_empty", rdd_0.rdd(), transfer_0, true); ``` scala API: ``` /** * save rdd into normal table * desc cupid_wordcount_empty; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val transfer_0 = (v: Tuple2[String, String], record: Record, schema: TableSchema) => { record.set("id", v._1) record.set(1, v._2) } odpsOps.saveToTable(projectName, "cupid_wordcount_empty", rdd_0, transfer_0, true) ```

5.3.6 指定具体分区存储RDD至分区表

java API: ``` /** * save rdd into partition table with single partition spec * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); VoidFunction3, Record, TableSchema> transfer_1 = new VoidFunction3, Record, TableSchema>() { @Override public void call(Tuple2 v1, Record v2, TableSchema v3) throws Exception { v2.set("id", v1._1()); v2.set("value", v1._2()); } }; javaOdpsOps.saveToTable(projectName, "cupid_partition_table1", "pt1=test,pt2=dev", rdd_0.rdd(), transfer_1, true); ``` scala API: ``` /** * save rdd into partition table with single partition spec * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val transfer_1 = (v: Tuple2[String, String], record: Record, schema: TableSchema) => { record.set("id", v._1) record.set("value", v._2) } odpsOps.saveToTable(projectName, "cupid_partition_table1", "pt1=test,pt2=dev", rdd_0, transfer_1, true) ```

5.3.7 动态指定分区存储RDD至分区表

java API: ``` /** * dynamic save rdd into partition table with multiple partition spec * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaOdpsOps javaOdpsOps = new JavaOdpsOps(ctx); VoidFunction4, Record, PartitionSpec, TableSchema> transfer_2 = new VoidFunction4, Record, PartitionSpec, TableSchema>() { @Override public void call(Tuple2 v1, Record v2, PartitionSpec v3, TableSchema v4) throws Exception { v2.set("id", v1._1()); v2.set("value", v1._2()); String pt1_value = new Random().nextInt(10) % 2 == 0 ? "even" : "odd"; String pt2_value = new Random().nextInt(10) % 2 == 0 ? "even" : "odd"; v3.set("pt1", pt1_value); v3.set("pt2", pt2_value); } }; javaOdpsOps.saveToTableForMultiPartition(projectName, "cupid_partition_table1", rdd_0.rdd(), transfer_2, true); ``` scala API: ``` /** * dynamic save rdd into partition table with multiple partition spec * desc cupid_partition_table1; * +------------------------------------------------------------------------------------+ * | Field | Type | Label | Comment | * +------------------------------------------------------------------------------------+ * | id | string | | | * | value | string | | | * +------------------------------------------------------------------------------------+ * | Partition Columns: | * +------------------------------------------------------------------------------------+ * | pt1 | string | | * | pt2 | string | | * +------------------------------------------------------------------------------------+ */ val sc = new SparkContext(conf) val odpsOps = new OdpsOps(sc) val transfer_2 = (v: Tuple2[String, String], record: Record, part: PartitionSpec, schema: TableSchema) => { record.set("id", v._1) record.set("value", v._2) val pt1_value = if (new Random().nextInt(10) % 2 == 0) "even" else "odd" val pt2_value = if (new Random().nextInt(10) % 2 == 0) "even" else "odd" part.set("pt1", pt1_value) part.set("pt2", pt2_value) } odpsOps.saveToTableForMultiPartition(projectName, "cupid_partition_table1", rdd_0, transfer_2, true) ``` 提交作业的运行命令如下。 ``` bin/spark-submit \ --master yarn-cluster \ --class com.aliyun.odps.spark.examples.OdpsTableReadWrite \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ``` MaxCompute Table读取并发度可以通过如下两个方法进行调整。 - 调整spark.hadoop.odps.input.split.size,该值越大,map task越少,默认为256MB。 - 通过OdpsOps.readTable中的numPartition进行设置,该值可直接决定map task个数,模式按照spark.hadoop.odps.input.split.size进行计算。

5.4 MaxCompute Table Spark-SQL案例

使用sqlContext读写MaxCompute Table。 ``` ⚠️注意: - 案例Demo中的Project/Table必须存在,或者更改为对应的Project/Table。 - Spark-defaults.conf里必须设置spark.sql.catalogImplementation = odps。 ``` 案例代码如下。 ``` package com.aliyun.odps.spark.examples import org.apache.spark.sql.SparkSession object OdpsTableReadWriteViaSQL { def main(args: Array[String]) { // please make sure spark.sql.catalogImplementation=odps in spark-defaults.conf // to enable odps catalog val spark = SparkSession .builder() .appName("OdpsTableReadWriteViaSQL") .getOrCreate() val projectName = spark.sparkContext.getConf.get("odps.project.name") val tableName = "cupid_wordcount" // get a ODPS table as a DataFrame val df = spark.table(tableName) println(s"df.count: ${df.count()}") // Just do some query spark.sql(s"select * from $tableName limit 10").show(10, 200) spark.sql(s"select id, count(id) from $tableName group by id").show(10, 200) // any table exists under project could be use // productRevenue spark.sql( """ |SELECT product, | category, | revenue |FROM | (SELECT product, | category, | revenue, | dense_rank() OVER (PARTITION BY category | ORDER BY revenue DESC) AS rank | FROM productRevenue) tmp |WHERE rank <= 2 """.stripMargin).show(10, 200) spark.stop() } } ``` 提交作业的运行命令如下。 ``` bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.OdpsTableReadWrite /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ```

5.5 MaxCompute自研Client模式案例

由于安全考虑,MaxCompute里的机器不能进行直连。所以原生Spark中的yarn-client模式无法使用。为了解决交互式的需求,MaxCompute团队自主研发了Client模式。 案例代码如下。 ``` package com.aliyun.odps.spark.examples import com.aliyun.odps.cupid.client.spark.client.CupidSparkClientRunner object SparkClientNormalFT { def main(args: Array[String]) { val cupidSparkClient = CupidSparkClientRunner.getReadyCupidSparkClient() val jarPath = args(0) //client-jobexamples jar path val sparkClientNormalApp = new SparkClientNormalApp(cupidSparkClient) sparkClientNormalApp.runNormalJob(jarPath) cupidSparkClient.stopRemoteDriver() } } ``` 完整Client模式含如下文件: - Client Control ``` spark-examples/src/main/scala/com/aliyun/odps/spark/examples/SparkClientNormalFT.scala spark-examples/src/main/scala/com/aliyun/odps/spark/examples/SparkClientNormalApp.scala ``` - SparkJob Unit ``` client-jobexamples/src/main/scala/com/aliyun/odps/cupid/client/spark/CacheRddJob.scala client-jobexamples/src/main/scala/com/aliyun/odps/cupid/client/spark/ParallRddJob.scala client-jobexamples/src/main/scala/com/aliyun/odps/cupid/client/spark/UseCacheRdd.scala client-jobexamples/src/main/scala/com/aliyun/odps/cupid/client/spark/SparkJobKill.scala ``` 提交作业的运行命令如下。 ``` java -cp \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar:$SPARK_HOME/jars/* \ com.aliyun.odps.spark.examples.SparkClientNormalFT \ /path/to/aliyun-cupid-sdk/examples/client-jobexamples/target/client-jobexamples_2.11-1.0.0-SNAPSHOT.jar ```

5.6 MaxCompute Table PySpark案例

PySpark读写MaxCompute Table。 案例代码如下。 ``` from odps.odps_sdk import OdpsOps from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, DataFrame if __name__ == '__main__': conf = SparkConf().setAppName("odps_pyspark") sc = SparkContext(conf=conf) sql_context = SQLContext(sc) project_name = "cupid_testa1" in_table_name = "cupid_wordcount" out_table_name = "cupid_wordcount_py" normal_df = OdpsOps.read_odps_table(sql_context, project_name, in_table_name) for i in normal_df.sample(False, 0.01).collect(): print i print "Read normal odps table finished" OdpsOps.write_odps_table(sql_context, normal_df.sample(False, 0.001), project_name, out_table_name) print "Write normal odps table finished" ``` 提交作业的运行命令如下。 ``` spark-submit \ --master yarn-cluster \ --jars /path/to/aliyun-cupid-sdk/external/cupid-datasource/target/cupid-datasource_2.11-1.0.0-SNAPSHOT.jar \ --py-files /path/to/aliyun-cupid-sdk/examples/spark-examples/src/main/python/odps.zip \ /path/to/aliyun-cupid-sdk/examples/spark-examples/src/main/python/odps_table_rw.py ```

5.7 Mllib案例

Mllib的model建议使用OSS进行读写。 案例代码如下。 ``` package com.aliyun.odps.spark.examples.mllib import org.apache.spark.mllib.clustering.KMeans._ import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.SparkSession object KmeansModelSaveToOss { val modelOssDir = "oss://bucket/kmeans-model" def main(args: Array[String]) { //1. train and save the model val spark = SparkSession .builder() .appName("KmeansModelSaveToOss") .config("spark.hadoop.fs.oss.accessKeyId", "***") .config("spark.hadoop.fs.oss.accessKeySecret", "***") .config("spark.hadoop.fs.oss.endpoint", "***") .getOrCreate() val sc = spark.sparkContext val points = Seq( Vectors.dense(0.0, 0.0), Vectors.dense(0.0, 0.1), Vectors.dense(0.1, 0.0), Vectors.dense(9.0, 0.0), Vectors.dense(9.0, 0.2), Vectors.dense(9.2, 0.0) ) val rdd = sc.parallelize(points, 3) val initMode = K_MEANS_PARALLEL val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) val predictResult1 = rdd.map(feature => "cluster id: " + model.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect println("modelOssDir=" + modelOssDir) model.save(sc, modelOssDir) //2. predict from the oss model val modelLoadOss = KMeansModel.load(sc, modelOssDir) val predictResult2 = rdd.map(feature => "cluster id: " + modelLoadOss.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect assert(predictResult1.size == predictResult2.size) predictResult2.foreach(result2 => assert(predictResult1.contains(result2))) } } ``` 提交作业的运行命令如下。 ``` ./bin/spark-submit \ --jars cupid/hadoop-aliyun-package-3.0.0-alpha2-odps-jar-with-dependencies.jar \ --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ```

5.8 pyspark交互式案例

只有能直连计算集群的机器上可执行pyspark。 选装python2.7,并设置path: ``` export PATH=/path/to/python/bin/:$PATH ``` 启动命令: ``` bin/pyspark --master yarn-client ``` 交互执行: ``` df=spark.sql("select * from spark_user_data") df.show() ```

5.9 spark-shell交互式案例(读表)

只有能直连计算集群的机器上可执行spark-shell。 启动命令: ``` bin/spark-shell --master yarn ``` 交互执行: ``` sc.parallelize(0 to 100, 2).collect sql("show tables").show sql("select * from spark_user_data").show(200,100) ```

5.10 spark-shell交互式案例(mlib+OSS读写)

只有能直连计算集群的机器上可执行spark-shell。 在conf/spark-defaults.conf中加入以下配置: ``` spark.hadoop.fs.oss.accessKeyId=*** spark.hadoop.fs.oss.accessKeySecret=*** spark.hadoop.fs.oss.endpoint=*** ``` 启动命令: ``` bin/spark-shell --master yarn --jars cupid/hadoop-aliyun-package-3.0.0-alpha2-odps-jar-withdependencies.jar ``` 交互执行: ``` import org.apache.spark.mllib.clustering.KMeans._ import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors val modelOssDir = "oss://your_bucket/kmeans-model" val points = Seq( Vectors.dense(0.0, 0.0), Vectors.dense(0.0, 0.1), Vectors.dense(0.1, 0.0), Vectors.dense(9.0, 0.0), Vectors.dense(9.0, 0.2), Vectors.dense(9.2, 0.0) ) val rdd = sc.parallelize(points, 3) val initMode = K_MEANS_PARALLEL val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) val predictResult1 = rdd.map(feature => "cluster id: " + model.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect println("modelOssDir=" + modelOssDir) model.save(sc, modelOssDir) val modelLoadOss = KMeansModel.load(sc, modelOssDir) val predictResult2 = rdd.map(feature => "cluster id: " + modelLoadOss.predict(feature) + " feature:" + feature.toArray.mkString(",")).collect assert(predictResult1.size == predictResult2.size) predictResult2.foreach(result2 => assert(predictResult1.contains(result2))) ```

5.11 sparkR交互式案例

只有能直连计算集群的机器上可执行sparkR,并且需要安装R至/home/admin/R目录,同时设置path: ``` export PATH=/home/admin/R/bin/:$PATH ``` 启动命令: ``` bin/sparkR --master yarn -—archives ./R/R.zip ``` 交互执行: ``` df <- as.DataFrame(faithful) df head(select(df, df$eruptions)) head(select(df, "eruptions")) head(filter(df, df$waiting < 50)) results <- sql("FROM spark_user_data SELECT *") head(results) ```

5.12 GraphX--PageRank案例

支持原生的graphx。 案例代码如下。 ``` package com.aliyun.odps.spark.examples.graphx import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD object PageRank { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("pagerank") val sc = new SparkContext(conf) // construct vertices val users: RDD[(VertexId, Array[String])] = sc.parallelize(List( "1,BarackObama,Barack Obama", "2,ladygaga,Goddess of Love", "3,jeresig,John Resig", "4,justinbieber,Justin Bieber", "6,matei_zaharia,Matei Zaharia", "7,odersky,Martin Odersky", "8,anonsys" ).map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail))) // construct edges val followers: RDD[Edge[Double]] = sc.parallelize(Array( Edge(2L,1L,1.0), Edge(4L,1L,1.0), Edge(1L,2L,1.0), Edge(6L,3L,1.0), Edge(7L,3L,1.0), Edge(7L,6L,1.0), Edge(6L,7L,1.0), Edge(3L,7L,1.0) )) // construct graph val followerGraph: Graph[Array[String], Double] = Graph(users, followers) // restrict the graph to users with usernames and names val subgraph = followerGraph.subgraph(vpred = (vid, attr) => attr.size == 2) // compute PageRank val pageRankGraph = subgraph.pageRank(0.001) // get attributes of the top pagerank users val userInfoWithPageRank = subgraph.outerJoinVertices(pageRankGraph.vertices) { case (uid, attrList, Some(pr)) => (pr, attrList.toList) case (uid, attrList, None) => (0.0, attrList.toList) } println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) } } ``` 提交作业的运行命令如下。 ``` bin/spark-submit \ --master yarn-cluster \ --class com.aliyun.odps.spark.examples.graphx.PageRank \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar ```

5.13 Spark Streaming--NetworkWordCount案例

支持原生Spark Streaming,以NetworkWordCount为例,首先本地需安装Netcat,并执行以下命令: ``` $ nc -lk 9999 ``` 此时,控制台的输入将成为SparkStreaming的输入。 案例代码如下。 ``` package com.aliyun.odps.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.examples.streaming.StreamingExamples import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount ") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND _DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } ``` 提交作业的运行命令如下。 ``` bin/spark-submit \ --master local[4] \ --class com.aliyun.odps.spark.examples.streaming.NetworkWordCount \ /path/to/aliyun-cupid-sdk/examples/spark-examples/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar localhost 9999 ```

6 特殊说明

本章节将对使用中的一些特殊情况进行相关说明。

6.1 Spark Streaming任务特殊配置

MaxCompute支持Spark Streaming,为了支持长任务运行,需要在spark-defaults.conf中添加如下的特殊配置。 不同于离线作业,Streaming作业有一些特殊配置,这些配置也是配置在spark-defaults.conf即可生效 ``` // 配置成longtime才不会被回收。 spark.hadoop.odps.cupid.engine.running.type=longtime // 配置时间长度。 spark.hadoop.odps.cupid.job.capability.duration.hours=25920 // 配置failOver最大重试次数。 spark.yarn.maxAppAttempts=10 // 是否启用writeAheadLog模式,能够保证数据不丢失,但是效率会降低。 spark.streaming.receiver.writeAheadLog.enable=true ```

6.2 Tracking Url说明

提交完成作业后,一般会有如下输出。 ``` 17/08/28 14:53:26 INFO Client: client token: N/A diagnostics: N/A ApplicationMaster host: 11.137.199.2 ApplicationMaster RPC port: 57524 queue: queue start time: 1503903179541 final status: SUCCEEDED tracking URL: http://jobview.odps.aliyun-inc.com/proxyview/jobview/?h=http://service.odps.aliyun-inc.com/api&p=odps_public_dev&i=20170828065141675g5h4t6u1&t=spark&id=application_1503903039442_1185611255&metaname=20170828065141675g5h4t6u1&token=L0dSMHRkSlNXS2ZkeFE1UkVsckthTTZQWHV3PSxPRFBTX09CTzoxMDU5NTgyNzI0MzIyOTk5LDE1MDQxNjIzODMseyJTdGF0ZW1lbnQiOlt7IkFjdGlvbiI6WyJvZHBzOlJlYWQiXSwiRWZmZWN0IjoiQWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNzA4MjgwNjUxNDE2NzVnNWg0dDZ1MSJdfV0sIlZlcnNpb24iOiIxIn0= user: user ``` 在输出示例中看到TrackingUrl,表示用户的作业已经提交到MaxCompute集群。上面输出示例中的TrackingUrl非常关键,它既是SparkWebUI,也是HistoryServer的Url。

7 相关参考

如果用户希望获取更多的Spark的相关内容,请参考:[Spark Configuration](https://spark.apache.org/docs/2.1.0/configuration.html)。

8 License

Licensed under the Apache License 2.0