Spark 实战之通讯录相似度计算

xiaoxiao2021-02-28  8

需求:

Hive表中存有UserPhone跟LinkPhone 两个字段。 通过SparkSQL计算出UserPhone之间通讯录相似度>=80%的记录数据。

相似度 = A跟B的交集/A的通讯录大小。

pom文件

注意依赖之间的适配性,选择合适的版本。同时一般可能会吧Hive中conf/hive-site.xml配置文件拷贝一份到 IDEA目录

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sowhat.demo</groupId> <artifactId>PhoneBookSimilaryCal</artifactId> <version>1.0-SNAPSHOT</version> <!-- <properties>--> <!-- <mysql.version>6.0.5</mysql.version>--> <!-- <spring.version>4.3.6.RELEASE</spring.version>--> <!-- <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>--> <!-- <log4j.version>1.2.17</log4j.version>--> <!-- <quartz.version>2.2.3</quartz.version>--> <!-- <slf4j.version>1.7.22</slf4j.version>--> <!-- <hibernate.version>5.2.6.Final</hibernate.version>--> <!-- <camel.version>2.18.2</camel.version>--> <!-- <config.version>1.10</config.version>--> <!-- <jackson.version>2.8.6</jackson.version>--> <!-- <servlet.version>3.0.1</servlet.version>--> <!-- <net.sf.json.version>2.4</net.sf.json.version>--> <!-- <activemq.version>5.14.3</activemq.version>--> <!-- <spark.version>2.1.1</spark.version>--> <!-- <scala.version>2.11.8</scala.version>--> <!-- <hadoop.version>2.7.3</hadoop.version>--> <!-- </properties>--> <properties> <scala.version>2.11.8</scala.version> <scala.compat.version>2.11.8</scala.compat.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.2</hadoop.version> <hbase.version>1.0</hbase.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> </dependencies> <build> <finalName>PhoneBookSimilaryCal</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <!-- 这个组件让我们不用再 在项目上add frame 选择scala了,可以自动创建 *.scala 文件 --> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.sowhat.PhoneBookSimilaryCal</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!--名字任意 --> <phase>package</phase> <!-- 绑定到package生命周期阶段上 --> <goals> <goal>single</goal> <!-- 只运行一次 --> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

spark代码:

package com.sowhat /** * @author sowhat * @create 2020-07-02 16:30 */ import java.security.MessageDigest import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel import org.slf4j.{Logger, LoggerFactory} object PhoneBookSimilaryCal { def MD5(input: String): String = { var md5: MessageDigest = null try { md5 = MessageDigest.getInstance("MD5") } catch { case e: Exception => { e.printStackTrace println(e.getMessage) } } val byteArray: Array[Byte] = input.getBytes val md5Bytes: Array[Byte] = md5.digest(byteArray) var hexValue: String = "" for (i <- 0 to md5Bytes.length - 1) { val str: Int = (md5Bytes(i).toInt) & 0xff if (str < 16) { hexValue = hexValue + "0" } hexValue = hexValue + Integer.toHexString(str) } return hexValue.toString } def Yesterday = { val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val cal: Calendar = Calendar.getInstance() cal.add(Calendar.DATE, -1) dateFormat.format(cal.getTime) } def OneYearBefore = { val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.add(Calendar.YEAR, -1) dateFormat.format(cal.getTime()) } def SixMonthBefore = { val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.add(Calendar.MONTH, -6) dateFormat.format(cal.getTime) } def ThreeMonthBefore = { val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.add(Calendar.MONTH, -3) dateFormat.format(cal.getTime) } def OneMonthBefore = { val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal: Calendar = Calendar.getInstance() cal.add(Calendar.MONTH, -1) dateFormat.format(cal.getTime) } private val logger: Logger = LoggerFactory.getLogger(PhoneBookSimilaryCal.getClass) def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "yjy_research") // sparkSQL用到Hadoop的东西,所以权限用户要注意哦 val spark: SparkSession = SparkSession.builder().appName("phoneBookSimilaryCal") .config("spark.sql.shuffle.partitions", "1000") .config("spark.default.parallelism", "3000") .config("spark.driver.maxResultSize", "40g") //.conf("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.shuffle.io.maxRetries", "20") .config("spark.shuffle.io.retryWait", "10s") .config("spark.storage.memoryFraction", "0.5") .config("spark.shuffle.memoryFraction", "0.5") .config("executor-cores", "5") .config("spark.executor.instances", "10") .config("spark.executor.cores.config", "3000") .config("spark.executor.instances", "20") .config("spark.executor.memory", "40g") .config("spark.driver.memory", "40g") .config("spark.sql.warehouse.dir", "/user/hive/warehouse") .enableHiveSupport().getOrCreate() // 开启Hive table spark.sql("use dm_kg") val sqlText: String = "select user_phone,phone from user_phone_with_phone_message where user_phone not in( '59400a197e9bf5fbb2fbee0456b66cd6','f7e82e195810a01688db2eeecb8e56c9') and etl_date>'" + SixMonthBefore + "'" println(sqlText) val df: DataFrame = spark.sql(sqlText) val rdd: RDD[Row] = df.rdd def getUserPhoneAndPhone(iter: Iterator[Row]) = { var res: List[(String, String)] = List[(String, String)]() while (iter.hasNext) { val row: Row = iter.next() res = res.::(row.getString(0), row.getString(1)) } res.iterator } val userPhone_Phone: RDD[(String, String)] = rdd.mapPartitions(getUserPhoneAndPhone) userPhone_Phone.persist(StorageLevel.MEMORY_AND_DISK_SER) val userPhone_num: RDD[(String, Long)] = userPhone_Phone.map(x => (x._1, 1L)).reduceByKey(_ + _, 3000) def dealUserPhoneNum(iter: Iterator[(String, Long)]) = { var res: List[(String, String)] = List[(String, String)]() while (iter.hasNext) { val row: (String, Long) = iter.next() res.::=(row._1, row._1.concat("_").concat(row._2.toString)) } res.iterator } val userPhone_userPhoneNum: RDD[(String, String)] = userPhone_num.mapPartitions(dealUserPhoneNum) val userPhone_Phone_userPhoneNum: RDD[(String, (String, String))] = userPhone_Phone.join(userPhone_userPhoneNum, 3000) val userPhone_Phone_userPhoneNum_filter: RDD[(String, (String, String))] = userPhone_Phone_userPhoneNum.filter(x => x._2._2.split("_")(1).toLong != 1) def getSecondTuple(iter: Iterator[(String, (String, String))]) = { var res = List[(String, String)]() while (iter.hasNext) { val tuple: (String, (String, String)) = iter.next() res.::=(tuple._2) } res.iterator } val phone_userPhoneNum: RDD[(String, String)] = userPhone_Phone_userPhoneNum_filter.mapPartitions(getSecondTuple) val phone_userPhoneListWithSize: RDD[(String, (List[String], Int))] = phone_userPhoneNum.combineByKey( (x: String) => (List(x), 1), (old: (List[String], Int), x: String) => (x :: old._1, old._2 + 1), (par1: (List[String], Int), par2: (List[String], Int)) => (par1._1 ::: par2._1, par1._2 + par2._2) ) // 结果 (联系电话,(对应用户电话List,List大小)) val userPhoneList: RDD[List[String]] = phone_userPhoneListWithSize.filter(x => (x._2._2 < 1500 && x._2._2 > 1)).map(_._2._1) // 通讯录大小 (1,1500) 筛查出来 val userPhone_userPhone: RDD[List[String]] = userPhoneList.flatMap(_.sorted.combinations(2)) // https://blog.csdn.net/aomao4913/article/details/101274895 val userPhone_userPhone_Num: RDD[((String, String), Int)] = userPhone_userPhone.map(x => ((x(0), x(1)), 1)).reduceByKey(_ + _, 3000) // 获得 (UserPhone1,UserPhone2),LinkNum def dealData(iter: Iterator[((String, String), Int)]) = { var res = List[(String, String, Int)]() while (iter.hasNext) { val row: ((String, String), Int) = iter.next() val line = row._1.toString.split(",") // (userPhone_num,userPhone_num) res.::=(line(0).replace("(", ""), line(1).replace(")", ""), row._2) } res.iterator } val userPhone_num_with_userPhone_num_with_commonNum: RDD[(String, String, Int)] = userPhone_userPhone_Num.mapPartitions(dealData) def FirstToSecond(iter: Iterator[(String, String, Int)]) = { var res = List[(String, String, Long, Int)]() while (iter.hasNext) { val cur: (String, String, Int) = iter.next val itemList1: Array[String] = cur._1.toString.split("_") val itemList2: Array[String] = cur._2.toString.split("_") res.::=(itemList1(0), itemList2(0), itemList1(1).toLong, cur._3) } res.iterator } // userPhone1,userPhone2,userPhone1BookNum,CommonNum def SecondToFirst(iter: Iterator[(String, String, Int)]) = { var res = List[(String, String, Long, Int)]() while (iter.hasNext) { val cur: (String, String, Int) = iter.next val itemList1: Array[String] = cur._1.toString.split("_") val itemList2: Array[String] = cur._2.toString.split("_") res.::=(itemList2(0), itemList1(0), itemList2(1).toLong, cur._3) } res.iterator } // userPhone2,userPhone1,userPhone2BookNum,CommonNum val userPhone1_userPhone2_userPhone1BookNum_CommonNum_1: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(FirstToSecond).filter(_._3 > 1) val userPhone2_userPhone1_userPhone2BookNum_CommonNum_2: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(SecondToFirst).filter(_._3 > 1) val userPhone1_userPhone2_userPhone1BookNum_CommonNum: RDD[(String, String, Long, Int)] = userPhone2_userPhone1_userPhone2BookNum_CommonNum_2.union(userPhone1_userPhone2_userPhone1BookNum_CommonNum_1) def finalDeal(iter: Iterator[(String, String, Long, Int)]) = { var res = List[(String, Long, String, String, Long, String)]() while (iter.hasNext) { val cur: (String, String, Long, Int) = iter.next() res.::=(cur._1.toString, cur._4 * 100 / cur._3, cur._2.toString, "Similar_phoneBook", cur._3, Yesterday) } res.iterator } // user_phone1,percent,user_phone2,label,userPhone1BookNum,CalDate val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate: RDD[(String, Long, String, String, Long, String)] = userPhone1_userPhone2_userPhone1BookNum_CommonNum.mapPartitions(finalDeal) val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter: RDD[(String, Long, String, String, Long, String)] = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate.filter(_._2 >= 80) import spark.implicits._ val finalResult: DataFrame = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.toDF() printf("·:" + userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.collect().length) spark.sql("drop table if exists sowhat_similar_phonebook_result") spark.sql("CREATE TABLE IF NOT EXISTS sowhat_similar_phonebook_result" + "(startId string comment '起始节点ID'," + "similar_percent string comment '相似度'," + "endId string comment '终止节点ID'," + "type string comment '边的类型'," + "telbook_num long comment '通讯录个数'," + "etl_date Date comment 'etl日期') " + "row format delimited fields terminated by ',' ") logger.info("created table similar_phonebook_result") finalResult.createOrReplaceTempView("resultMessage") spark.sql("insert into sowhat_similar_phonebook_result select * from resultMessage") spark.sql("select count(1) from sowhat_similar_phonebook_result").show() spark.stop() } }

spark集群启动脚本命令: 

time sshpass -p passpwrd ssh user@ip " nohup spark-submit --name "sowhatJob" --master yarn --deploy-mode client \ --conf spark.cleaner.periodicGC.interval=120 --conf spark.executor.memory=20g \ --conf spark.num.executors=20 --conf spark.driver.memory=20g --conf spark.sql.shuffle.partitions=1500 \ --conf spark.network.timeout=100000000 --queue root.kg \ (Hadoop集群中YARN队列) --class com.sowhat.PhoneBookSimilaryCal PhoneBookSimilaryCal1.jar "

参考:

Spark全套资料

SoWhat1412 认证博客专家 签约作者 后端coder 微信搜索【SoWhat1412】,第一时间阅读原创干货文章。人之患、在好为人师、不实知、谨慎言。点点滴滴、皆是学问、看到了、学到了、便是收获、便是进步。
转载请注明原文地址: https://www.6miu.com/read-1250366.html

最新回复(0)