sparksql读取hive底层_原创-sparksql写入hive较慢优化思路

sparksql读取hive底层_原创-sparksql写入hive较慢优化思路

2023年7月24日发(作者:)

sparksql读取hive底层_原创-sparksql写⼊hive较慢优化思路在《spark sql 写⼊hive较慢原因分析》中已经分析了spark sql 写⼊hive分区⽂件慢的原因,笔者提供⼏种优化思路供参考:(1)spark 直接⽣成hive库表底层分区⽂件,然后再使⽤add partion语句添加分区信息(s"alter table _log_hive_text add partition (name_par='${dirName}')")(2)spark ⽣成⽂件存放到HDFS⽬录下,使⽤hive脚本命令,load数据到hive中hive -e "load data inpath '/test/test_log_hive/name_par=test$i' overwrite into table _log_hive_textpartition(name_par='test$i') "(3)修改spark配置⽂件,指定hive metastore版本及jar所在位置,查看spark源码可看到spark⽀持的hive版本在0.12.0-2.3.3版本之间,修改参数n及参数private[spark] object HiveUtils extends Logging {def withHiveExternalCatalog(sc: SparkContext): SparkContext = {(CATALOG_, "hive")sc}/** The version of hive used internally by Spark SQL. */val builtinHiveVersion: String = "1.2.1"val HIVE_METASTORE_VERSION = buildConf("n").doc("Version of the Hive metastore. Available options are " +s"0.12.0 through 2.3.3.").WithDefault(builtinHiveVersion)// A fake config which is only here for backward compatibility reasons. This config has no effect// to Spark, just for reporting the builtin Hive version of Spark to existing applications that// already rely on this FAKE_HIVE_VERSION = buildConf("n").doc(s"deprecated, please use ${HIVE_METASTORE_} to get the Hive version in Spark.").WithDefault(builtinHiveVersion)val HIVE_METASTORE_JARS = buildConf("").doc(s"""| Location of the jars that should be used to instantiate the HiveMetastoreClient.| This property can be one of three options: "| 1. "builtin"| Use Hive ${builtinHiveVersion}, which is bundled with the Spark assembly when| -Phive is enabled. When this option is chosen,| n must be either| ${builtinHiveVersion} or not defined.| 2. "maven"| Use Hive jars of specified version downloaded from Maven repositories.| 3. A classpath in the standard format for both Hive and Hadoop.""".stripMargin).WithDefault("builtin")笔者根据⾃⼰需求实际情况采⽤的是第⼆种⽅法,笔者实际使⽤场景:Oracle GG实时读取上游DB⽇志数据,推送到kafka,流处理程序实时保存变化⽇志到hbase表中,hbase表每天合并操作⽇志⽣成T-1⽇切⽚表,再使⽤spark读取hbase表数据,同步到离线库中供离线分析使⽤(主要是借⽤hbase完成数据的更新,删除)以下是demopackage port ort port urationimport .{FileSystem, FileUtil, Path}import onfigurationimport .{Result, Scan}import bleBytesWritableimport nputFormatimport ufUtilimport .{Base64, Bytes}import onfimport .{DataTypes, StringType, StructField}import .{RowFactory, SparkSession}import Factoryimport eimport ufferobject HbaseToHive {val log = ger(ss)// private val hdfsPath = "/user/hive/warehouse//test_log_hive_text"private val hdfsPath = "/test/test_log_hive"def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName(s"${pleName}")("izer", "rializer")("ss", "true")("ss", "true")("ss", "false")// ("", "CompressionCodec")erKryoClasses(Array(classOf[ImmutableBytesWritable]))val spark = r().config(sparkConf).appName(s"${pleName}").enableHiveSupport().getOrCreate()val conf = ()// ("", "node1:2181,node2:2181,node3:2181")("", "30.4.137.224:2181,30.4.137.228:2181,30.4.137.229:2181")(_TABLE, "test:test_log_hive")val scan = new Scan()val proto = (scan)(, Bytes(Array))val hBaseRDD = HadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])val list = new ist[StructField]()val rowKey = StructField("rowKey", StringType, true)val name = StructField("name", StringType, true)val age = StructField("age", StringType, true)val mobile = StructField("mobile", StringType, true)val addr = StructField("addr", StringType, true)(rowKey)(name)(age)(mobile)(addr)val schema = StructType(list)val mapHbaseRDD = (x => {val result = x._2val rowKey = ng()val name = ue(s("info"), s("name"))val age = ue(s("info"), s("age"))val mobile = ue(s("info"), s("mobile"))val addr = ue(s("info"), s("addr"))(rowKey, ng(name), ng(age), ng(mobile), ng(addr))})val df = DataFrame(mapHbaseRDD, schema)(r => {(ing(1), ng(","))}).repartition(3).saveAsHadoopFile(hdfsPath, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat])val dirs = getDirs(hdfsPath)val loadSql = (dir => {val dirNames = ("/")val dirName = dirNames( - 1)s"load data inpath '${dir}' overwrite into table _log_hive_text partition (name_par='${dirName}')"})val loadSqlMap = spliceList(, 30)val loadSqlGroups = new ArrayBuffer[String]h(x => {loadSqlGroups += x._ng(";") + parator()})D(loadSqlGroups).repartition(1).saveAsTextFile(hdfsPath + "/" + "load_sql")//// ("use legend")// ("set =nonstrict")// ("create table test_log_hive_text(rowKey STRING, name STRING,age STRING,mobile STRING,addr " +// "STRING) partitioned by(name_par STRING) row format delimited fields terminated by ','")//// for (dirPath// val dirNames = ("/")// val dirName = dirNames( - 1).split("=")(1)// (s"alter table _log_hive_text add partition (name_par='${dirName}')")// }// ition(5)// TempView("result")//// ("use legend")// ("set =nonstrict")// ("insert into _log_hive partition(name_par) select rowKey,name,age,mobile,addr,name as name_parfrom result")// (ite).format("parquet").partitionBy("name").insertInto("_log")// ("use legend")// ("set =nonstrict")// ("load data inpath '/test/test_log_hive' OVERWRITE INTO TABLE _log_hive_text PARTITION// " +// "(create_day='2019-04-28') ")// ("insert overwrite table _log_hive_orc PARTITION(name_par) select rowKey,name,age,// mobile," +// "addr,name as name_par from test_log_hive_text where create_day='2019-04-28' ")}//获取⽬录下的⼀级⽬录def getDirs(path: String): Array[String] = {getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory).map(_.toString)}//获取⽬录下的⼀级⽂件和⽬录def getFilesAndDirs(path: String): Array[Path] = {val fs = getHdfs(path).listStatus(new Path(path))2Paths(fs)}//⽣成FileSystemdef getHdfs(path: String): FileSystem = {val conf = new Configuration()((path), conf)}/*** 拆分集合** @param datas* @param splitSize* @return*/def spliceList(datas: List[String], splitSize: Int): p[String, List[String]] = {if (datas == null || splitSize < 1) return nullval totalSize = l count = if (totalSize % splitSize == 0) totalSize / splitSizeelse totalSize / splitSize + 1val map = new p[String, List[String]]();for (ival cols = (i * splitSize, if (i == count - 1) totalSizeelse splitSize * (i + 1))map(ng) = cols}map}}package port leTextOutputFormatclass RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {// ("name_par=" + key + "/" + name)(key + "/" + name)}override def generateActualKey(key: Any, value: Any): String = {null}}demo主要功能是读取hbase数据并按照分区字段值,分别保存到hdfs⽬录上,最后使⽤hive命令脚本load数据到hive表中

发布者:admin,转转请注明出处:http://www.yc00.com/web/1690194149a312642.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信