[hdfs@bdddev-agent-205 bin]$ ./pyspark Python 2.7.5 (default, Nov 6 2016, 00:28:07) [GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Python version 2.7.5 (default, Nov 6 2016 00:28:07) SparkSession available as 'spark'. >>> from __future__ import division >>> from pyspark.mllib.recommendation import ALS >>> from pyspark.sql import HiveContext >>> from pyspark.sql import SparkSession >>> from collections import namedtuple >>> import math >>> import datetime >>> spark = SparkSession.builder.appName("bjrecommender").enableHiveSupport().getOrCreate() >>> sc = spark.sparkContext >>> hiveCtx = HiveContext(sc) >>> Rating = namedtuple("Rating", ["user", "product", "rating"]) >>> tid = namedtuple('tid',['id','cust_id']) >>> now = datetime.datetime.now() >>> begin_date = (now-datetime.timedelta(days=150)).strftime('%Y%m%d') >>> begin_date = (now-datetime.timedelta(days=200)).strftime('%Y%m%d') >>> end_date = now.strftime('%Y%m%d') >>> sql="select dense_rank() over(order by cust_id) id,cust_id,item_id,need_score+sold_score score from (select cust_id,item_id,qty_need,qty_sold,ntile(5) over(partition by cust_id order by qty_need) need_score,ntile(5) over(partition by cust_id order by qty_sold) sold_score from (select cust_id,item_id,sum(qty_need) qty_need,sum(qty_sold) qty_sold from yxpt.pi_cust_item_day where date1>=" +begin_date + " and date1<="+end_date+ " group by cust_id,item_id) a1) b1" >>> total = hiveCtx.sql(sql) >>> id_custid=total.rdd.map(lambda x : tid(str(x[0]),x[1])).distinct() 17/08/30 09:18:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. >>> id_custid.toDF().registerTempTable("id_cid") >>> ratings=total.rdd.map(lambda x : Rating(str(x[0]),int(x[2]),float(x[3]))) >>> ratings.toDF().show(3) 17/08/30 09:29:31 ERROR Utils: Uncaught exception in thread stdout writer for python2.7 java.net.SocketException: Socket is closed at java.net.Socket.shutdownOutput(Socket.java:1551) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:336) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$3.apply(PythonRDD.scala:336) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$3.apply(PythonRDD.scala:336) at org.apache.spark.util.Utils$.tryLog(Utils.scala:1964) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:336) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951) at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) +----+--------+------+ |user| product|rating| +----+--------+------+ | 1|42010319| 2.0| | 1|31010401| 2.0| | 1|22240114| 2.0| +----+--------+------+ only showing top 3 rows >>> >>> model = ALS.train(ratings, rank=15, iterations=10,seed=0,lambda_=0.001) 17/08/30 09:34:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 17/08/30 09:34:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 17/08/30 09:34:59 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 17/08/30 09:34:59 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK 17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4644: [rdd_3272_0] 17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4645: [rdd_3273_0] 17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4646: [rdd_3272_0] 17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4647: [rdd_3273_0] >>> all_rating=model.recommendProductsForUsers(30).map(lambda x:x[1]).collect() >>> len(rating) Traceback (most recent call last): File "<stdin>", line 1, in <module> NameError: name 'rating' is not defined >>> len(all_rating) 33695 >>> len(all_rating[0]) 30 >>> userProducts = ratings.map(lambda rating:(rating.user,rating.product)) >>> predictions = model.predictAll(userProducts).map(lambda rating:((rating.user,rating.product),rating.rating)) >>> type(predictions) <class 'pyspark.rdd.PipelinedRDD'> >>> predictions.toDF().show(3) [Stage 258:=====================> (3 + 5) / 8]17/08/30 09:44:26 WARN Executor: Managed memory leak detected; size = 15977666 bytes, TID = 4787 17/08/30 09:44:27 WARN Executor: Managed memory leak detected; size = 15977666 bytes, TID = 4788 +---------------+-----------------+ | _1| _2| +---------------+-----------------+ |[4904,37020312]|4.338272324285362| |[4904,32010112]|2.763827789148973| |[4904,12010504]|6.962521675730641| +---------------+-----------------+ only showing top 3 rows >>> ratingsAndPredictions = ratings.map(lambda rating:((int(rating.user),rating.product),rating.rating)).join(predictions) >>> ratingsAndPredictions.toDF().show(3) +----------------+--------------------+ | _1| _2| +----------------+--------------------+ | [4075,53100103]|[4.0,3.1492042815...| |[20152,13070515]|[3.0,4.0453910858...| | [1335,34030227]|[4.0,3.9336990908...| +----------------+--------------------+ only showing top 3 rows >>> MSE = ratingsAndPredictions.map(lambda ((x,y),(m,n)):math.pow(m-n,2)).reduce(lambda x,y:x+y)/ratingsAndPredictions.count() >>> print "***************" +str(math.sqrt(MSE)) + "*****************" ***************1.39966771197***************** >>> k=[] >>> for row in all_rating: ... k.extend(row) ... >>> all_rating_rdd = sc.parallelize(k) >>> all_rating_rdd.toDF().registerTempTable("all_score") 17/08/30 10:04:58 WARN TaskSetManager: Stage 415 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. >>> hiveCtx.sql("select * from all_score limit 5").show(3) 17/08/30 10:08:19 WARN TaskSetManager: Stage 416 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +-----+--------+------------------+ | user| product| rating| +-----+--------+------------------+ |27456|51520615| 35.52432167919441| |27456|90020219|29.566229211420946| |27456|34030316| 28.08260143903327| +-----+--------+------------------+ only showing top 3 rows >>> hiveCtx.sql("select a2.cust_id,a1.product,rating," + end_date +" date "+ " from all_score a1,id_cid a2 " + "where a1.user=a2.id").show(5) 17/08/30 10:18:22 WARN TaskSetManager: Stage 417 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +------------+--------+------------------+--------+ | cust_id| product| rating| date| +------------+--------+------------------+--------+ |110101100985|90190202| 25.33671962331747|20170830| |110101100985|34030326|23.906456902069216|20170830| |110101100985|90020726|16.010945324507635|20170830| |110101100985|90190101|15.628847628582498|20170830| |110101100985|90020727|15.442605374580097|20170830| +------------+--------+------------------+--------+ only showing top 5 rows >>> >>> hiveCtx.sql("SELECT * from id_cid limit 5").show(5) +-----+------------+ | id| cust_id| +-----+------------+ | 4549|110105106838| |12992|110108207746| |30968|110228100250| |22213|110114100048| |19728|110113101105| +-----+------------+ >>> hiveCtx.sql("select B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3").show(3) +------------+ | CUST_CODE| +------------+ |110105106838| |110108207746| |110228100250| +------------+ >>> hiveCtx.sql("select B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3 ").show(3) +------------+ | CUST_CODE| +------------+ |110105106838| |110108207746| |110228100250| +------------+ >>> hiveCtx.sql("select C.PACK_BAR "+" from yxpt.PLM_ITEM C,all_score D "+" where C.ITEM_ID=D.product limit 3").show(3) 17/08/30 15:08:24 WARN TaskSetManager: Stage 448 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +-------------+ | PACK_BAR| +-------------+ |6901028032957| |4893225033276| |6901028208550| +-------------+ >>> >>> hiveCtx.sql("select * from all_score limit 5").show(3) 17/08/30 16:09:04 WARN TaskSetManager: Stage 465 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +-----+--------+------------------+ | user| product| rating| +-----+--------+------------------+ |27456|51520615| 35.52432167919441| |27456|90020219|29.566229211420946| |27456|34030316| 28.08260143903327| +-----+--------+------------------+ only showing top 3 rows >>> >>> hiveCtx.sql("select CO_CUST_T.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST CO_CUST_T,id_cid id_cid_t "+" where CO_CUST_T.CUST_ID=id_cid_t.CUST_ID limit 3").show(3) +------------+ | CUST_CODE| +------------+ |110105106838| |110108207746| |110228100250| +------------+ >>> hiveCtx.sql("select PLM_ITEM_T.PACK_BAR "+" from yxpt.PLM_ITEM PLM_ITEM_T,all_score all_score_t "+" where PLM_ITEM_T.ITEM_ID=all_score_t.product limit 3").show(3) 17/08/30 18:32:13 WARN TaskSetManager: Stage 481 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +-------------+ | PACK_BAR| +-------------+ |6901028032957| |4893225033276| |6901028208550| +-------------+ >>> hiveCtx.sql("select * from all_score limit 5").show(3) 17/08/30 23:53:47 WARN TaskSetManager: Stage 482 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +-----+--------+------------------+ | user| product| rating| +-----+--------+------------------+ |27456|51520615| 35.52432167919441| |27456|90020219|29.566229211420946| |27456|34030316| 28.08260143903327| +-----+--------+------------------+ only showing top 3 rows >>> hiveCtx.sql("insert into table yxpt.pi_cust_item_recommend2 PARTITION (ymday='"+end_date +"') select a2.cust_id,a1.product,rating "+ " from all_score a1,id_cid a2 " + "where a1.user=a2.id") 17/08/31 00:13:05 WARN TaskSetManager: Stage 493 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. 17/08/31 00:13:12 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !! DataFrame[] >>> len(all_rating) 33695 >>> hiveCtx.sql("select CO_CUST_T.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST CO_CUST_T,id_cid id_cid_t "+" where CO_CUST_T.CUST_ID=id_cid_t.CUST_ID limit 3").show(3) +------------+ | CUST_CODE| +------------+ |110105106838| |110108207746| |110228100250| +------------+ >>> len(all_rating) 33695 >>> hiveCtx.sql("select B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3").show(3) +------------+ | CUST_CODE| +------------+ |110105106838| |110108207746| |110228100250| +------------+ >>> hiveCtx.sql("select A.CUST_ID,B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3").show(3) +------------+------------+ | CUST_ID| CUST_CODE| +------------+------------+ |110105106838|110105106838| |110108207746|110108207746| |110228100250|110228100250| +------------+------------+ >>> hiveCtx.sql("select D.product,C.PACK_BAR "+" from yxpt.PLM_ITEM C,all_score D "+" where C.ITEM_ID=D.product limit 3").show(3) 17/08/31 04:02:23 WARN TaskSetManager: Stage 514 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +--------+-------------+ | product| PACK_BAR| +--------+-------------+ |51520615|6901028032957| |90020219|4893225033276| |34030316|6901028208550| +--------+-------------+ >>> hiveCtx.sql("select D.product,C.PACK_BAR "+" from yxpt.PLM_ITEM C,all_score D "+" where C.ITEM_ID=D.product limit 5").show(5) 17/08/31 04:02:50 WARN TaskSetManager: Stage 516 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB. +--------+-------------+ | product| PACK_BAR| +--------+-------------+ |51520615|6901028032957| |90020219|4893225033276| |34030316|6901028208550| |90020923|4893225020443| |51520635|6901028085724| +--------+-------------+ >>> hiveCtx.sql("insert into table yxpt.pi_cust_item_recommend3 select A.CUST_ID,B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID ") DataFrame[] >>> >>> hiveCtx.sql("insert into table yxpt.pi_cust_item_recommend4 select D.product,C.PACK_BAR "+" from yxpt.PLM_ITEM C,all_score D "+" where C.ITEM_ID=D.product ") 17/08/31 04:31:08 WARN TaskSetManager: Stage 524 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.