User协同过滤(基于Spark实现)

User协同过滤(基于Spark实现)

2023年6月23日发(作者:)

User协同过滤(基于Spark实现)推荐系统的作业流程:召回/match(推荐引擎)-> 物品候选集 -> 过滤 -> 排序 -> 策略(保证结果多样性) -> 推荐list协同过滤CF属于第⼀阶段,我们常常称之为“推荐引擎”。“推荐引擎”可以有多个基准,包括:基于相似⽤户、基于相似物品、基于特征搜索,以及基于热门等⽅式。通过不同的⽅式可以解决不同的问题,譬如冷启动问题,这⾥介绍的是基于相似⽤户的⽅式。在本⽂中,不会详细介绍代码,主要从逻辑上讲述。基本步骤:1、找出当前⽤户的若⼲个相似⽤户,取出每个相似⽤户购买过的商品(或打分过的电影)集合;2、基于当前⽤户的购买过的商品(或打分过的电影)集合,对其相似⽤户购买过的商品(或打分过的电影)集合进⾏过滤,得出存在相似⽤户,同时不存在当前⽤户的商品(或电影)集合;3、基于当前⽤户与相似⽤户之间的相似度,以及⽤户对商品(或电影)的打分,进⾏排序取topN,得到物品候选集具体实现:数据格式(user_id, item_id, rating, timestamp):⼀、创建源数据 这⾥采⽤数据源为hive,同样的,可以创建⼀张具有(user_id, item_id, rating, timestamp)字段的hive表,这样⼦就可以通过Spark的DF、SparkSQL等组建对数据进⾏处理。⼆、计算⽤户相似度 区别于上⼀篇的python实现,使⽤Spark实现,是使⽤向量的cosine定理。cosine=a*b/|a|*|b|每个⽤户相当于⼀个向量,他们各⾃购买过的商品是其对应向量的维度,维度值就是商品的分值。1、计算分母 通过向量的定义,那么在Spark操作hive数据当中,就显得特别容易,1. 对每⼀⾏数据的rating进⾏平⽅运算;2. 基于user_id进⾏聚合;3. 再对平⽅后的rating求和,再开根号;经过上述步骤,可以将所有的向量的模都求出来。得到的DF数据结构为:(user_id, rating_sqrt_sum)val userScoreSum = (x=>(x(0).toString,x(2).toString)) .groupByKey() .mapValues(x=>sqrt((rating=>pow(le,2)).sum)) .toDF("user_id","rating_sqrt_sum")2、计算分⼦ 分⼦部分是两个向量之间进⾏点乘,即向量之间的各个维度进⾏⼀⼀相乘,再相加。所以先基于原始DF,重新copy⼀份,作为相似⽤户的DF,然后基于item_id,对两张表进⾏聚合,构建了DF的数据结构为:(item_id, user_id, rating, user_v, rating_v)。有了这张表,就可以对rating和rating_v进⾏相乘,然后基于user_id、user_v做聚合操作,再将刚才rating和rating_v的乘积进⾏累加,就可以算出分⼦,计算得到的DF数据结构:(user_id, user_v, rating_dot)// 倒排表(基于item的笛卡⼉积) val vDataDF = Expr("user_id as user_v", "item_id", "rating as rating_v") val u_v_decare = (vDataDF,"item_id") .filter("case(rating as long)<>case(rating_v as long)")// 计算分⼦,维度值(rating)点乘,累加求和 val df_product = u_v_Expr("item_id","user_id","user_v","case(rating as double)*case(rating_v as double) as prod") val df_sim_group = df_y("user_id","user_v") .agg("prod"->"sum") .withColumnRenamed("sum(prod)","rating_dot")3、计算cosine 构建⼀个新的DF数据结构,分别基于user_id,user_v,将步骤1、2的DF进⾏聚合,得到(user_id, user_v, rating_dot,rating_sqrt_sum, rating_sqrt_sum_v),接着对每⼀⾏数据直接套⽤cosine公式,最后选取需要的字段,构成新数据结构:(user_id,user_v)。//计算整个cosineval vScoreSum = Expr("user_id as user_v","rating_sqrt_sum as rating_sqrt_sum_v")val df_sim_cosine = df_sim_group .join(userScoreSum,"user_id") .join(vScoreSum,"user_v") .selectExpr("user_id","user_v","rating_dot/(rating_sqrt_sum*rating_sqrt_sum_v) as cosine_sim")三、过滤商品,并对商品进⾏打分1、过滤 过滤商品之前,我们需要做得事情,⾸先获取topN个相似⽤户,然后取出这topN个相似⽤户所对应的物品集合,再进⾏过滤。1.1、获取topN相似⽤户 df_sim_cosine的结构⾥⾯是(user_id, user_v, cosine_sim),这个结构的数据已经是包括⽤户两两之间的相似度,换句话说,只要根据user_id做聚合,然后基于cosine做反向排序,slice切⽚,就可以取到user_id的topN个相似⽤户。//使⽤slice取得topN个相似⽤户val sim_user_topN = df_sim_(row=>(row(0).toString,(row(1).toString,row(2).toString))) .groupByKey() .mapValues(_.th((x,y)=>x._2>y._2).slice(0,10)) //列转⾏, RDD[(String, Array[(String, String)])] .flatMapValues(x=>x) //⾏转列, RDD[(String, (String, String))] .toDF("user_id","user_v_sim") .selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as cosine_sim")//将⼀个tuple的字段拆分成两个字段1.2、获取user_id和其相似⽤户物品列表 经过上⼀步可以获取topN个相似⽤户的数据,只要分别基于user_id,user_v进⾏关联,就可以将商品列表给关联上,⽽且关联后的数据,附带了商品的打分,便于后⾯给候选商品列表打分。 val df_user_items = (row=>(row(0).toString,row(1).toString+"_"+row(2).toString)) .groupByKey() .mapValues(_.toArray) .toDF("user_id","item_rating_arr") val df_user_items_v = df_user_Expr("user_id as user_id_v", "item_rating_arr as item_rating_arr_v") //依次基于user_id、user_v聚合 val df_gen_item = sim_user_topN .join(df_user_items,"user_id") .join(df_user_items_v,"user_v")要知道,userDataDF的数据结构是(user_id, item_id, rating),所以,要获取“列表”,则必须对user_id进⾏聚合,这⾥做了⼀个格式处理,将item和rating⽤“_”连接,合并成⼀个数据处理。1.3、过滤商品 由于上⼀步对item和rating的数据结构进⾏处理,所以这⼀步需要定义⼀个UDF来对商品进⾏过滤。// ⽤⼀个udf从user_v的商品集合中,将与user_id具有相同的商品过滤掉,得到候选集 import ons._ val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=> val fMap = {x=> val l = ("_") (l(0),l(1)) }.toMap //返回items_v,过滤商品 items_{x=> val l = ("_") lse(l(0),-1) == -1 } }items参数是user_id的商品集,items_v是user_v的商品集,使⽤该UDF后,会得到⼀个在user_v商品集基础上过滤掉user_id商品集的、全新的商品集,然后选取需要的列构建新的DF。//过滤掉user_id商品的DF数据(user_id, consine_sim, item_rating) val df_filter_item = df_gen_lumn("filtered_item", filter_udf(df_gen_item("item_rating_arr"),df_gen_item("item_rating_arr_v"))) .select("user_id","cosine_sim", "filtered_item")2、给候选商品进⾏打分(物品分数=⽤户相似度*相似⽤户对电影(物品)的打分) 经过过滤操作,我们得到⼀个数据结构(user_id, cosine_sim, filtered_item)的DF,现在显⽽易见,需要的参数已经有了,剩下的就是直接套⽤公式。但是不要忘记,filtered_item的数据是⼀个Array类型,是⼀个商品的集合,所以可以定义⼀个UDF,作⽤是遍历商品集合,分别乘以对应的cosine_sim。 val simRatingUDF = udf{(sim:Double,items:Seq[String])=> {item_rating=> val l = item_("_") l(0)+"_"+l(1).toDouble*sim } }得到的仍然是⼀个Array类型数据,即topN⾥的每个相似⽤户对应的物品的集合,我们最终要的是topN相似⽤户的商品集合组成的总的商品集合,再取topN个商品,所以,必须将Array拆开,可以使⽤explode。//DF:(user_id,item_prod) val itemSimRating = df_filter_lumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item"))) .select("user_id","item_prod") //⾏转列Array[item_prod],并分割item_pro。 // 注意:得出的数据结果,会出现多个相同的user_id->item,因为同⼀个user_id的不同相似⽤户,可能会有同⼀样商品,分割后,就出现这情况 val userItemScore = (itemSimRating("user_id"),explode(itemSimRating("item_prod"))) .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//将⼀个字符串的字段拆分成两个字段 这⾥⼜会出现⼀个问题,user_id有topN个相似⽤户,他们对应得到的商品集合⾥⾯,很⼤可能存在相同的item,那么就需要基于user_id和item_id做⼀个聚合,然后将相同item_id的打分进⾏累加,这才候选商品最后的打分。(区别于上⼀篇的python实现,它是基于杰卡尔德计算⽤户相似度,使⽤的是商品数量,相当于商品的原始权重都为1,并没有区分出⼀些具有代表性意义的商品,所以它还要对商品进⾏log降权处理;但是cosine计算相似度,直接使⽤rating值,rating值已经是对商品标上不同的权重)//DF:(user_id,item_prod) val itemSimRating = df_filter_lumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item"))) .select("user_id","item_prod") //⾏转列Array[item_prod],并分割item_pro。 // 注意:得出的数据结果,会出现多个相同的user_id->item,因为同⼀个user_id的不同相似⽤户,可能会有同⼀样商品,分割后,就出现这情况 val userItemScore = (itemSimRating("user_id"),explode(itemSimRating("item_prod"))) .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//将⼀个字符串的字段拆分成两个字段 //基于user_id和item_id做聚合 val userItemScoreSum = y("user_id","item_id") .agg("score"->"sum") .withColumnRenamed("sum(score)","last_score")四、取topN商品 //排序取topN商品 val df_rec = (row=>(row(0),(row(1).toString,row(2).toString))) .groupByKey() .mapValues(_.th((x,y)=>x._2>y._2).slice(0,10)) .flatMapValues(x=>x) .toDF("user_id","item_sim") .selectExpr("user_id","item_sim._1 as item_id", "item_sim._2 as score")

发布者:admin,转转请注明出处:http://www.yc00.com/news/1687516798a16291.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信