简介
POLARDB数据库是阿里云开发的下一代关系型云数据库。100%兼容MySQL,性能高达MySQL的6倍。但是随着数据量的不断增加,面临着单个SQL无法分析结果的现状。X-Pack Spark为数据库提供了分析引擎,旨在创建一个闭环数据库。在X-Pack Spark的帮助下,POLARDB数据可以归档到以列存储的Parquet文件中,一条SQL可以分析复杂的数据,分析结果可以返回到业务库进行查询。本文主要介绍如何利用X-Pack Spark data workbench对POLARDB数据进行存档。
业务架构
业务需要为多种报表提供不同纬度的日报表和月报,并提供外部查询服务;最大的米目前是500G,数据量还在增加。我尝试了spark直接通过jdbc解析POLARDB。一方面是慢;另一方面,每次我扫描POLARDB的全部数据时,都会对在线业务产生影响。基于以下考虑因素选择POLARDB+Spark的架构:
选择POLARDB按天增量归档到spark列存,每天增量数据量比较少,选择业务低峰期归档,对在线查询无影响选择Spark作为报表分析引擎,因为Spark很适合做ETL,且内置支持数据回流到POLARDB、MongoDB等多种在线库选择Spark离线数仓作为数据的中转站,对于分析的结果数据回流到在线库提供查询,能够一条Spark SQL完成分析,不需要按维度值拆分多条分析SQL
前提条件
1。设置Spark以访问POLARDB白名单
Spark cluster和POLARDB只能在同一VPC下访问。目前X-Pack Spark上不支持POLARDB数据库的一键关联,所以需要将Spark集群的IP加入POLARDB白名单。以后会开通一键关联POLARDB的功能。
在“HBase控制台”->:在集群列表中找到分析Spark实例,在“数据库连接”列中找到“VSwitch ID”交换机ID,如下图所示:
然后在“VPC控制台的专用网络”->:”开关”搜索交换机实例ID以找到IPV4网段。
将Spark集群网络添加到POLARDB白名单中,进入“控制台”->:集群列表找到要关联的POLARDB实例,然后在基本信息->中:“访问信息”->:将“白名单”添加到Spark集群所属的网段中。
2。创建一个测试表
POLARDB中已经有一个测试表。如果没有可以登录到POLARDB数据库创建的测试表,下面将以这个测试表为例。
CREATE TABLE IF NOT EXISTS test.us_population ( state CHAR(2) NOT NULL PRIMARY KEY, city VARCHAR(10), population INTEGER, dt TIMESTAMP );INSERT INTO test.us_population VALUES('NY','New York',8143197, CURRENT_DATE );INSERT INTO test.us_population VALUES('CA','Los Angeles',3844829, CURRENT_DATE);INSERT INTO test.us_population VALUES('IL','Chicago',2842518, '2019-04-13');INSERT INTO test.us_population VALUES('TX','Houston',2016582, '2019-04-14');INSERT INTO test.us_population VALUES('PA','Philadelphia',1463281, '2019-04-13');INSERT INTO test.us_population VALUES('AZ','Phoenix',1461575, '2019-04-15');INSERT INTO test.us_population VALUES('SA','San Antonio',1256509, CURRENT_DATE);INSERT INTO test.us_population VALUES('SD','San Diego',1255540, CURRENT_DATE);INSERT INTO test.us_population VALUES('DL','Dallas',1213825, '2019-04-15');INSERT INTO test.us_population VALUES('SJ','San Jose',912332,'2019-04-15');
I .使用交互式工作台归档数据(调试、测试)
创建火花运行会话
在”HBase控制台”->”会话管理”创建一个会话,指定会话名,执行集群,如图所示:
在编辑器中输入Spark启动参数,并运行进程以用于交互式查询。
--driver-memory 1G --driver-cores 1--executor-cores 1--executor-memory 2G--num-executors 1--name spark_on_polardb--jars /spark-demo/mysql-connector-java-5.1.34.jar
参数描述:
注意:以上参数在测试环境下太小,数据量大时根据实际集群规格和数据量配置。
会话成功运行后,如下图所示:
存档数据的交互查询
创建火花映射POLARDB表
输入”HBase控制台”->”交互式查询”,在会话列表中选择上一步创建的“spark_on_polardb”,然后新建一个查询,指定查询名称,选择“SQL”作为查询类型,如图所示:
在查询输入框中输入Spark表构建语句,以与POLARDB表相关联。表构建语句是:
create table spark_polordbusing org.apache.spark.sql.jdbcoptions ( driver "com.mysql.jdbc.Driver", url "jdbc:mysql://pc-xxx.rwlb.rds.aliyuncs.com:3306", dbtable "test.us_population", user 'xxx', password 'xxxxxx')
参数描述:
点击运行,当查询状态为“成功”时,创建成功。
查询测试
在上一步创建的查询编辑器中输入查询语句,然后运行:
SELECT * FROM spark_polordb
查询成功后,返回的结果如下:
创建存档表
X-Pack Spark将POLARDB数据归档为Parquet列存储格式,一方面可以获得更好的压缩空,另一方面可以在后续的分析任务中获得更高的效率。
Spark创建parquet分区表的语句如下,它也是在第一步的交互式查询编辑中输入的:
CREATE table parquetTable(state CHAR(2), city VARCHAR(10), population int)USING parquetPARTITIONED BY(dt timestamp)
参数描述:
表构建成功后,可以将POLARDB数据写入Parquet表。
存档数据
数据归档可以通过查询POLARDB数据并将其写入parquet表来完成。操作语句是:
INSERT INTO parquetTable partition(dt) SELECT state, city, population, dt FROM spark_polordb
操作成功后,数据归档完成。查询拼花桌数据:
二。工作流程计划周期的存档(生产T+1的存档)
交互式查询主要用于测试和调试。归档一般需要t+1操作,每天定时对当前数据进行归档,这就需要工作流的周期性调度。在这里,我们将具体介绍如何利用工作流的周期性调度来实现t+1归档。
归档代码编写
在使用工作流之前,您需要创建相应的Spark作业。Spark archive POLARDB可以实现一个完整的作业,包括以下过程:
在Spark中创建POLARDB表映射表(前提POLARDB中表已经存在)创建Spark分区归档表将数据写入归档表
Cloud Spark提供了Spark archive POLARDB的代码演示,请参考github:SparkArchivePolarDB。
具体的归档代码要结合实际场景,归档不同的表格,设置具体的分区和归档条件等。
上传Spark存档作业资源
通过资源管理将jar包的spark存档演示代码上传到资源列表,jar包的下载地址:Spark存档工具演示下载。
自己写的Spark作业也需要放入一个jar包中上传到资源列表,后面的作业需要运行jar包中的存档作业。
创建一个Spark作业
进入“HBase控制台”->:”工作台”->"作业管理"→:"创建作业",如图
编辑作业内容
作业内容主要指定了Spark作业的运行参数,以及具体的归档作业编码类和入库参数等。,以SparkArchivePolarDB演示为例:
--class com.aliyun.spark.polardb.SparkOnPolarDBArchive--driver-memory 1G --driver-cores 1--executor-cores 1--executor-memory 2G--num-executors 1--jars /spark-demo/mysql-connector-java-5.1.34.jar/spark-demo/spark-examples-0.0.1-SNAPSHOT.jarpc-xxx.rwlb.rds.aliyuncs.com:3306 test.us_population username passwd sparkTestPolarDB
参数描述:
其他参数请参考上一节。
操作配置如图所示:
运行作业并查看结果。
可以在作业运行一段时间后查看运行状态,成功后在交互查询中查看存档表数据。
进入交互式工作台,参照上面的介绍查看存档表数据:
配置工作流
进入“HBase控制台”->:“数据工作台”->:工作流,选择一个新的工作流,指定工作流名称,描述和执行集群,
然后进入工作流设计工作台,拖动Spark作业并对其进行配置,选择之前的配置作业并连接:
选择”工作流配置”->”属性调度”,打开调度状态,设置其实时时间和调度周期,工作流将被定期调度,如图:
三。归档方式(输出表格形式)
完全存档
全卷归档方法主要用于归档原始数据库中的历史数据,或者用于数据量较小的表。存档步骤如下:
使用Spark的jdbc datasource创建POLARDB的映射表;在Spark中创建相同表结构的归档表,归档表使用Parquet列式存储,能够最大化节约存储空间,并加速分析性能;通过映射表读取POLARDB数据并写入Spark归档表,注意写入时保证字段顺序一致。
创建归档表时,如果表数据很大,可以创建分区表。分区策略一般分为时间分区和业务分区:
时间分区易于使用,即将相同时间的数据归档到同一个目录,比如选择按年或者按天进行时间分区,在分析时限定数据分区即可过滤掉与分析任务无关的数据。业务分区字段需要具有有限的类别,比如性别、年龄、部门等。业务分区需要结合具体业务进行考虑,分区个数不宜过多,spark默认最大分区数为1000。分区方式可以选择静态分区和动态分区,默认使用静态分区,即写入数据时必须指定写入哪个分区,动态分区需要将hive.exec.dynamic.partition.mode设置为nonstrict,写入时根据具体分区字段值动态创建分区,相同partition key值写入同一个分区。
使用示例如下:SparkOnPolarDBArchivedemo
增量存档
业务数据只是增量的
在业务表中,没有更新和删除数据的操作,只有对数据表的增量写入。在这种情况下,只需要在数据表中记录数据入库时间或其他标记来记录新数据。Spark中使用了工作流周期调度,引入了增量数据条件,新数据定期在Spark中存档。
业务数据更新
对于业务数据的更新数据,如果在原表中无法识别更新数据,目前只能采用全量归档的方式,一次归档一次全量数据,并可以覆盖原归档的表数据;;如果有更新数据标记,如update_time字段,则不能使用合并直接更新现有数据..因为Spark目前不支持ACID。增量更新归档步骤如下:
设置更新增量数据选择条件(归档表全量归档时已创建),如update_time大于某个日期;抽取增量更新的数据写入spark临时表;将历史数据归档表与增量更新数据表进行left out join并过滤出增量表字段为空的数据,表示历史数据中未参与增量更新的数据,然后与增量更新的数据进行union合并,写入Spark临时表;将临时表数据覆盖写入到归档表中作为新的归档数据参与后续业务分析。
目前Spark Update增量存档只能使用join关联的方法遍历所有数据完成数据更新,但好处是尽量避免影响在线库POLARDB的数据访问,一次只读取部分更新和增量数据,将计算工作放在廉价的Spark集群中。
示例如下:SparkOnPolarDBIncrement
另一种方式:如果需要在业务端保留多个版本的更新数据,可以直接将更新和增量数据添加到存档表中,然后在业务端以最晚的时间判断有效数据,这样可以避免每次更新时复杂的计算过程。
更新和删除业务数据
如果业务表有删除,Spark目前没有好的办法支持。需要在业务数据库中记录被删除的关键字段信息,将其与档案表连接,过滤掉连接的数据并覆盖在档案表中,以达到删除的效果。
摘要
在实际的数据开发中,往往需要很多Spark作业来完成数据归档和分析。单个工作流支持配置多个作业并按顺序执行它们。同时使用交互式工作台来验证数据,减少了开发中的诸多不便。目前工作台还在不断优化中,使用中遇到任何不便都可以随时提出建议,方便简化你的数据开发工作。
本文来自果味果冻投稿,不代表舒华文档立场,如若转载,请注明出处:https://www.chinashuhua.cn/24/591160.html