博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark2.1:读取hive中存储的多元组(string,double)失败
阅读量:5162 次
发布时间:2019-06-13

本文共 11895 字,大约阅读时间需要 39 分钟。

这两天和同事一起在想着如何把一个表的记录减少,表记录包含了:objectid(主小区信息),gridid(归属栅格),height(高度),rsrp(主小区rsrp),n_objectid(邻区),n_rsrp(邻小区rsrp)

记录中一个主小区对应有多个邻区信息,在分组合并记录时:

1)先按照objectid,gridid,height进行分组,把所有邻区信息给存储到集合中;

2)基于1)的结果之上,按照objectid分组,把gridid,height,rsrp,array(n_objectid),array(n_rsrp)作为集合存储。

实现思路一:采用array<array<string>>单维元祖存储

[my@sdd983 tommyduan_service]$ /app/my/fi_client/spark2/Spark2x/spark/bin/spark-shell2018-03-24 14:10:38,583 | WARN  | main | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | org.apache.hadoop.util.NativeCodeLoader.
(NativeCodeLoader.java:62)2018-03-24 14:10:38,827 | WARN | main | In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)2018-03-24 14:10:38,837 | WARN | main | Detected deprecated memory fraction settings: [spark.shuffle.memoryFraction, spark.storage.memoryFraction, spark.storage.unrollFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable `spark.memory.useLegacyMode` (not recommended). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)Spark context Web UI available at http://192.168.143.332:23799Spark context available as 'sc' (master = local[*], app id = local-1521871839949).Spark session available as 'spark'.Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72)Type in expressions to have them evaluated.Type :help for more information.scala> import spark.sqlimport spark.sqlscala> import spark.implicits._import spark.implicits._scala> sql("use my_hive_db")2018-03-24 14:10:56,686 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)2018-03-24 14:10:58,250 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)res0: org.apache.spark.sql.DataFrame = []scala> var fpb_df = sql( | s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis | |from fpd_tabke | |where p_city=571 and p_day=20180322 limit 50 | |""".stripMargin)fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]scala> var fpb_groupby_obj_grid_height_df1 = fpb_df.groupBy("objectid", "gridid", "height", "rsrp").agg( | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("n_objectid1").alias("n_objectid"), col("rsrp_dis1").alias("rsrp_dis"))fpb_groupby_obj_grid_height_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: string ... 4 more fields]scala> var fpb_groupby_obj_df1 = fpb_groupby_obj_grid_height_df1.groupBy("objectid").agg( | collect_list("gridid").alias("gridid1"), | collect_list("height").alias("height1"), | collect_list("rsrp").alias("rsrp1"), | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("n_objectid1").alias("n_objectid"),col("rsrp_dis1").alias("rsrp_dis"))fpb_groupby_obj_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: array
... 4 more fields]scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).show+---------+--------------------+--------------------+--------------------+--------------------+--------------------+| _1| _2| _3| _4| _5| _6|+---------+--------------------+--------------------+--------------------+--------------------+--------------------+|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray(101...|[WrappedArray(0.0...|+---------+--------------------+--------------------+--------------------+--------------------+--------------------+scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).schemares4: org.apache.spark.sql.types.StructType = StructType( StructField(_1,StringType,true), StructField(_2,ArrayType(StringType,true),true), StructField(_3,ArrayType(StringType,true),true), StructField(_4,ArrayType(StringType,true),true), StructField(_5,ArrayType(ArrayType(StringType,true),true),true), StructField(_6,ArrayType(ArrayType(DoubleType,false),true),true))

 方案二:存储格式为:array<array<(string,double)>>,读取失败。

scala> sql("use my_hive_db")2018-03-24 14:10:56,686 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)2018-03-24 14:10:58,250 | WARN  | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)res0: org.apache.spark.sql.DataFrame = []scala>     var fpb_df = sql(     |       s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis     |           |from fpd_tabke     |           |where p_city=571 and p_day=20180322 limit 50     |           |""".stripMargin)fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]scala> var fpb_groupby_obj_grid_height_df2 = fpb_df.map(s =>      (s.getAs[String]("objectid"), s.getAs[String]("gridid"), s.getAs[String]("height"), s.getAs[String]("rsrp"), (s.getAs[String]("n_objectid"), s.getAs[Double]("rsrp_dis")))    ).toDF("objectid", "gridid", "height", "rsrp", "neighbour").groupBy("objectid", "gridid", "height", "rsrp").agg(      collect_list("neighbour").alias("neighbour1")    ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("neighbour1").alias("neighbour"))scala> var fpb_groupby_obj_df2 = fpb_groupby_obj_grid_height_df2.groupBy("objectid").agg(      collect_list("gridid").alias("gridid1"),      collect_list("height").alias("height1"),      collect_list("rsrp").alias("rsrp1"),      collect_list("neighbour").alias("neighbour1")    ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("neighbour1").alias("neighbour"))scala>     val encoder = Encoders.tuple(     |       Encoders.STRING,     |       Encoders.javaSerialization[Seq[String]],     |       Encoders.javaSerialization[Seq[String]],     |       Encoders.javaSerialization[Seq[String]],     |       Encoders.javaSerialization[Seq[Seq[(String, Double)]]]     |     )encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]scala> fpb_groupby_obj_df2.show+---------+--------------------+--------------------+--------------------+--------------------+| objectid|              gridid|              height|                rsrp|           neighbour|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray([10...|+---------+--------------------+--------------------+--------------------+--------------------+scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show+---------+--------------------+--------------------+--------------------+--------------------+|    value|                  _2|                  _3|                  _4|                  _5|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|+---------+--------------------+--------------------+--------------------+--------------------+scala>   fpb_groupby_obj_df2.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4))).show()[Stage 6:======================================================>(963 + 1) / 964]2018-03-24 15:09:09,267 | ERROR | Executor task launch worker for task 3859 | Exception in task 0.0 in stage 7.0 (TID 3859) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381)        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)        at org.apache.spark.scheduler.Task.run(Task.scala:99)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)        at java.lang.Thread.run(Thread.java:745)        scala>  val encoder = Encoders.tuple(     |       Encoders.STRING,     |       Encoders.kryo[Seq[String]],     |       Encoders.kryo[Seq[String]],     |       Encoders.kryo[Seq[String]],     |       Encoders.kryo[Seq[Seq[(String, Double)]]]     |     )encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]scala>  fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show+---------+--------------------+--------------------+--------------------+--------------------+|    value|                  _2|                  _3|                  _4|                  _5|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|+---------+--------------------+--------------------+--------------------+--------------------+

 

转载于:https://www.cnblogs.com/yy3b2007com/p/8638908.html

你可能感兴趣的文章
CVE-2014-6321 && MS14-066 Microsoft Schannel Remote Code Execution Vulnerability Analysis
查看>>
给一次重新选择的机会_您还会选择程序员吗?
查看>>
Mysql MHA高可用集群架构
查看>>
心急的C小加
查看>>
编译原理 First,Follow,select集求法
查看>>
java 浅拷贝和深拷贝
查看>>
vue实例中中data属性三种写法
查看>>
uva1636 - Headshot(条件概率)
查看>>
iOS开发 runtime实现原理以及实际开发中的应用
查看>>
BZOJ2437 NOI2011兔兔与蛋蛋(二分图匹配+博弈)
查看>>
android 学习资源网址
查看>>
shell基础
查看>>
2018.1.15
查看>>
[集合DP] UVA 10651 Pebble Solitaire
查看>>
qt安装遇到的错误
查看>>
寻找完美平方数
查看>>
java:Apache Shiro 权限管理
查看>>
objective c的注释规范
查看>>
FreeNas安装配置使用
查看>>
机器学习中的F1-score
查看>>