这两天和同事一起在想着如何把一个表的记录减少,表记录包含了:objectid(主小区信息),gridid(归属栅格),height(高度),rsrp(主小区rsrp),n_objectid(邻区),n_rsrp(邻小区rsrp)
记录中一个主小区对应有多个邻区信息,在分组合并记录时:
1)先按照objectid,gridid,height进行分组,把所有邻区信息给存储到集合中;
2)基于1)的结果之上,按照objectid分组,把gridid,height,rsrp,array(n_objectid),array(n_rsrp)作为集合存储。
实现思路一:采用array<array<string>>单维元祖存储
[my@sdd983 tommyduan_service]$ /app/my/fi_client/spark2/Spark2x/spark/bin/spark-shell2018-03-24 14:10:38,583 | WARN | main | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | org.apache.hadoop.util.NativeCodeLoader.(NativeCodeLoader.java:62)2018-03-24 14:10:38,827 | WARN | main | In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)2018-03-24 14:10:38,837 | WARN | main | Detected deprecated memory fraction settings: [spark.shuffle.memoryFraction, spark.storage.memoryFraction, spark.storage.unrollFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable `spark.memory.useLegacyMode` (not recommended). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)Spark context Web UI available at http://192.168.143.332:23799Spark context available as 'sc' (master = local[*], app id = local-1521871839949).Spark session available as 'spark'.Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72)Type in expressions to have them evaluated.Type :help for more information.scala> import spark.sqlimport spark.sqlscala> import spark.implicits._import spark.implicits._scala> sql("use my_hive_db")2018-03-24 14:10:56,686 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)2018-03-24 14:10:58,250 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)res0: org.apache.spark.sql.DataFrame = []scala> var fpb_df = sql( | s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis | |from fpd_tabke | |where p_city=571 and p_day=20180322 limit 50 | |""".stripMargin)fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]scala> var fpb_groupby_obj_grid_height_df1 = fpb_df.groupBy("objectid", "gridid", "height", "rsrp").agg( | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("n_objectid1").alias("n_objectid"), col("rsrp_dis1").alias("rsrp_dis"))fpb_groupby_obj_grid_height_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: string ... 4 more fields]scala> var fpb_groupby_obj_df1 = fpb_groupby_obj_grid_height_df1.groupBy("objectid").agg( | collect_list("gridid").alias("gridid1"), | collect_list("height").alias("height1"), | collect_list("rsrp").alias("rsrp1"), | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("n_objectid1").alias("n_objectid"),col("rsrp_dis1").alias("rsrp_dis"))fpb_groupby_obj_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: array ... 4 more fields]scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).show+---------+--------------------+--------------------+--------------------+--------------------+--------------------+| _1| _2| _3| _4| _5| _6|+---------+--------------------+--------------------+--------------------+--------------------+--------------------+|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray(101...|[WrappedArray(0.0...|+---------+--------------------+--------------------+--------------------+--------------------+--------------------+scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).schemares4: org.apache.spark.sql.types.StructType = StructType( StructField(_1,StringType,true), StructField(_2,ArrayType(StringType,true),true), StructField(_3,ArrayType(StringType,true),true), StructField(_4,ArrayType(StringType,true),true), StructField(_5,ArrayType(ArrayType(StringType,true),true),true), StructField(_6,ArrayType(ArrayType(DoubleType,false),true),true))
方案二:存储格式为:array<array<(string,double)>>,读取失败。
scala> sql("use my_hive_db")2018-03-24 14:10:56,686 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)2018-03-24 14:10:58,250 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101)res0: org.apache.spark.sql.DataFrame = []scala> var fpb_df = sql( | s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis | |from fpd_tabke | |where p_city=571 and p_day=20180322 limit 50 | |""".stripMargin)fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields]scala> var fpb_groupby_obj_grid_height_df2 = fpb_df.map(s => (s.getAs[String]("objectid"), s.getAs[String]("gridid"), s.getAs[String]("height"), s.getAs[String]("rsrp"), (s.getAs[String]("n_objectid"), s.getAs[Double]("rsrp_dis"))) ).toDF("objectid", "gridid", "height", "rsrp", "neighbour").groupBy("objectid", "gridid", "height", "rsrp").agg( collect_list("neighbour").alias("neighbour1") ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("neighbour1").alias("neighbour"))scala> var fpb_groupby_obj_df2 = fpb_groupby_obj_grid_height_df2.groupBy("objectid").agg( collect_list("gridid").alias("gridid1"), collect_list("height").alias("height1"), collect_list("rsrp").alias("rsrp1"), collect_list("neighbour").alias("neighbour1") ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("neighbour1").alias("neighbour"))scala> val encoder = Encoders.tuple( | Encoders.STRING, | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[Seq[(String, Double)]]] | )encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]scala> fpb_groupby_obj_df2.show+---------+--------------------+--------------------+--------------------+--------------------+| objectid| gridid| height| rsrp| neighbour|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray([10...|+---------+--------------------+--------------------+--------------------+--------------------+scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show+---------+--------------------+--------------------+--------------------+--------------------+| value| _2| _3| _4| _5|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|+---------+--------------------+--------------------+--------------------+--------------------+scala> fpb_groupby_obj_df2.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4))).show()[Stage 6:======================================================>(963 + 1) / 964]2018-03-24 15:09:09,267 | ERROR | Executor task launch worker for task 3859 | Exception in task 0.0 in stage 7.0 (TID 3859) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) scala> val encoder = Encoders.tuple( | Encoders.STRING, | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[Seq[(String, Double)]]] | )encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary]scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show+---------+--------------------+--------------------+--------------------+--------------------+| value| _2| _3| _4| _5|+---------+--------------------+--------------------+--------------------+--------------------+|100700931|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|+---------+--------------------+--------------------+--------------------+--------------------+