使用 Azure、Hadoop 和 Mahout 构建一个推荐系统

jopen 11年前

今天想帮助别人吗?

image根据用户之前的回答历史,我们可以把Stack Exchange的新问题推荐给一个能够回答的用户,这与亚马逊通过你之前的购买记录给你提供推荐很相似。不知道Stack Exchange是做什么的? - 它运行了很多的Q&A网站,包括广受欢迎的Stack Overflow。

我们目前的任务是如何分析用户之前的回答,从而预测该用户今后所能回答的问题。Stack Exchange目前的推荐逻辑可能比我们的好很多,但这不会阻止我们以个人学习目的去帮助他们;)。

我们将会做以下任务.

  • 从Stack Exchange数据集中提取所需的信息
  • 用提取的信息建立一个推荐

但是,一切从基础开始!如果你是第一次接触Apache Hadoop和Azure上的Hadoop,我建议你在开始之前阅读这些介绍性的文章,在我介绍HDInsight和Map Reduce模式时以便知道更详细的情况。

幕后 

让我们开始吧,先做一些分布式机器学习应用的“数据实验”,酷!

  • 建议 - 记得亚马逊的建议吧?基于历史来预测偏好。
  • 聚类  - 从一堆文件内找出相关的文档或从社区内找到志同道合的人等类似的任务。
  • 分类  - 识别一个新条目属于哪个分类,这通常包括一个先决的训练系统,还需要一个检测系统。

“大数据”这个术语通常在你需要处理一个非常大的数据集时使用。本文中,我们将从一个大的数据集中提取一些数据,然后根据我们提取的数据建立一个推荐。

推荐是什么?

从广义上讲,我们可以通过如下方式建立推荐:

  • 基于一个用户其类似用户之前回答的问题来寻找该用户可能愿意回答的问题。
  • 寻找跟他之前回答问题相似的问题。

image第一中技术手段称为基于用户的推荐,第二种称为基于产品的推荐。

在第一种情况下,会尝试确定你与该用户共同回答了多少问题(你们俩都回答的问题)。例如,现在有User1、User2、User3和User4 - 回答了Q1、Q2、Q3和Q4这几个问题。右边的图显示了用户及他们所回答的问题

根据这个图,User1和User2回答了Q1、Q2和Q3 - User2回答了Q2和Q3但没有回答Q1。现在,在一定程度上,我们可以放心地假设User3会有兴起回答Q1 - 因为两个和他一样都回答过Q2和Q3的用户已经回答过Q1。有点感觉了,是不是?所以,如果你有一个{用户名,问题ID}的对应数组 - 建立一个推荐所需的数据基本已经满足了。

逻辑表

现在,我们究竟该怎样建立一个问题推荐呢?事实上,它是相当简单的。

首先,我们需要找出用户与问题对共同出现的次数。请注意,该矩阵与用户无关。例如,如果Q1和Q2共同出现了2次(在上面的图中),则共同发生值{Q1,Q2}即为2。这是个共生矩阵(希望我是对的)。

  • Q1和Q2共同发生了2次 (User1、User2回答了Q1、Q2)
  • Q1和Q3共同发生了2次  (User1、User2都回答了Q1、Q2)
  • Q2和Q3共同发生了3次 (User1、User2和User3回答了Q2、Q3)
  • 其他的类似统计

image

上面的矩阵显示了上面我们所讨论的问题(回答)对发生的次数。现在还没有映射到用户。现在,我们如何将这个联系到用户喜好呢?要找出一个问题匹配用户的程度我们只需要:

  • 找出该问题与另外问题共同被这个用户回答的频率
  • 消除已经被该用户回答的问题。

第一步,我们需要将上述矩阵变更成用户的偏好矩阵。

让我们拿User3做个示范,对User3来说,因为他已回答了问题Q2、Q3而没有回答Q1、Q4,所以他对问题[Q1,Q2,Q3,Q4]的偏好映射为[0,1,1,0],现在,让我们把这个数据与上面的矩阵合并,注意这是一个点积矩阵。结果显示了用户连带回答各个问题的频率(权重)。

image

我们可以在结果中忽略Q2和Q3,因为我们已经知道用户回答了它们。现在剩下Q1和Q4 – Q1具有更高的值(4)更匹配User3。换种方式讲,这表明Q1相比Q4来说与User3已回答的问题(Q2和Q3)关联性更大–所以相比Q4,User3将更有兴趣回答Q1。在实际的应用中需要注意:随着用户的回答会逐渐集中,用户的偏好矩阵是将会是稀疏矩阵(主要是零)。该逻辑的优点:我们可以使用多个任务使用分布式的MapReduce模型来计算用户矩阵,找到每个用户的点积矩阵等。

查看我之前的例子或许对你有帮助:MapReduce介绍及例子

实现 

从实现的角度来看:

  1. 我们需要一个Hadoop集群
  2. 我们需要下载数据,解压后分析 (Stack Overflow数据)
  3. Job 1 – 提取数据 - 将用户回答的所有数据提取成{UserId, QuestionId}的形式,每条一行
  4. Job 2 – 建立推荐 - 使用上述的MapReduce模型输出的用户数据建立推荐。

让我们运行起来!!

Step 1 - 配置集群

记住,Stack Exchange的数据量是很大的,因此我们需要有一个对等的分布式环境来处理。我们用Windows Azure,如果没有帐户的话可以注册免费试用。进入起始页面,打开HDInsight(Azure Hadoop)预设。

如果HDInsight可用便可以很容易地创建一个Hadoop集群,我创建了一个命名为stackanalyzer的集群。

image

集群就绪的话,你会在仪表盘界面上看到连接和管理按钮(此处没有图片)。单击“连接”按钮就将连接到集群节点,这样会打开一个远程节点的桌面连接。你也可以点击“管理”按钮打开基于Web的仪表盘界面。(如果需要,请阅读关于HDInsight的更多内容

Step 2 - 分析你的数据

通过远程桌面连接到集群的头节点后就可以下载Stack Exchange数据了。你可以根据需要从Clear Bits下载Stack Exchange网站的数据。我在头结点上安装了Mu-Torrent客户端,然后下载并解压了http://cooking.stackexchange.com/的数据 – 解压出的文件是一堆XML文件。

image

我们感兴趣的东西在XML文件中。文件中的每一行包含一个问题或回答,如果是问题则posttypeid = 1,如果是回答则posttypeid = 2。ParentID代表回答对应的问题的ID,OwnerUserId代表做出该回答的人。

<row Id="16" PostTypeId="2" ParentId="2" CreationDate="2010-07-09T19:13:37.540" Score="3" 
     Body="&lt;p&gt;...shortenedforbrevity...  &lt;/p&gt;&#xA;" 
     OwnerUserId="34" LastActivityDate="2010-07-09T19:13:37.540" />

所以对于我们来说,我们需要提取PostTypeId=2(类型是回答)的所有发表内容的{OwnerUserId,ParentId},这是{User,Question,Votes}的一个表示。后面我们将使用这些数据生成推荐结果。

原文件的数据量很大,从中提取数据也是一个不小的工作。对于这个Cooking网站,它没有那么多的数据–但如果分析整个Stack Overflow的话,文件大小也有数GB。针对提取的这个操作我们可以使用Hadoop并编写一个自定义的MapReduce操作。

Step 3 - 从Dump(User,Question)中提取我们需要的数据

为了提取数据,我们将借助Hadoop来分配。首先要写一个简单的Mapper。就像前面提到过的,我们需要弄清楚所有PostTypeId=2的文章中的{OwnerUserId,ParentId}。这是因为我们要之后要为推荐工作输入的是{user,item}。基于此,首先要把Posts.XML加载到HDFS。你可以使用hadoop fs命令把本地文件复制到指定的输入路径。

image

现在,是时候开始写一个用户映射来提取数据了。我们将使用Hadoop On Azure .NET SDK来写Mapduce任务。不是我们在配置部分指明输入目录和输出目录。启动Visual Studio,创建一个C#控制台程序。如果你记得我之前写的文章,你会知道hadoop fs<yourcommand>是用来访问HDFS文件系统,当然如果你知道一些基本的*nix命令如 Is,cat等等会更好。

注意:: (之前的文章) 忽略HDInsight前面部分,你可以理解更多关于Map Reduce模型和Hadoop on Azure。

你需要通过Nuget包管理器来安装Hadop SDK for .NET上的Hadoop Map Reduce包。

install-package Microsoft.Hadoop.MapReduce 

有下面的代码,我们可以

  • 创建一个 映射
  • 创建一个任务
  • 提交任务到集群

具体如下:

using System;  using System.Collections.Generic;  using System.Globalization;  using System.Linq;  using System.Text;  using System.Xml.Linq;  using Microsoft.Hadoop.MapReduce;    namespace StackExtractor  {        //Our Mapper that takes a line of XML input and spits out the {OwnerUserId,ParentId,Score}       //i.e, {User,Question,Weightage}      public class UserQuestionsMapper : MapperBase      {          public override void Map(string inputLine, MapperContext context)          {              try              {                  var obj = XElement.Parse(inputLine);                  var postType = obj.Attribute("PostTypeId");                  if (postType != null && postType.Value == "2")                  {                      var owner = obj.Attribute("OwnerUserId");                      var parent = obj.Attribute("ParentId");                             // Write output data. Ignore records will null values if any                      if (owner != null && parent != null )                      {                          context.EmitLine(string.Format("{0},{1}", owner.Value, parent.Value));                      }                  }              }              catch              {                  //Ignore this line if we can't parse              }          }      }          //Our Extraction Job using our Mapper      public class UserQuestionsExtractionJob : HadoopJob<UserQuestionsMapper>      {          public override HadoopJobConfiguration Configure(ExecutorContext context)          {              var config = new HadoopJobConfiguration();              config.DeleteOutputFolder = true;              config.InputPath = "/input/Cooking";              config.OutputFolder = "/output/Cooking";              return config;          }                 }        //Driver that submits this to the cluster in the cloud      //And will wait for the result. This will push your executables to the Azure storage      //and will execute the command line in the head node (HDFS for Hadoop on Azure uses Azure storage)      public class Driver      {          public static void Main()          {              try              {                  var azureCluster = new Uri("https://{yoururl}.azurehdinsight.net:563");                  const string clusterUserName = "admin";                  const string clusterPassword = "{yourpassword}";                    // This is the name of the account under which Hadoop will execute jobs.                  // Normally this is just "Hadoop".                  const string hadoopUserName = "Hadoop";                    // Azure Storage Information.                  const string azureStorageAccount = "{yourstorage}.blob.core.windows.net";                  const string azureStorageKey =                      "{yourstoragekey}";                  const string azureStorageContainer = "{yourcontainer}";                  const bool createContinerIfNotExist = true;                  Console.WriteLine("Connecting : {0} ", DateTime.Now);                    var hadoop = Hadoop.Connect(azureCluster,                                              clusterUserName,                                              hadoopUserName,                                              clusterPassword,                                              azureStorageAccount,                                              azureStorageKey,                                              azureStorageContainer,                                              createContinerIfNotExist);                    Console.WriteLine("Starting: {0} ", DateTime.Now);                  var result = hadoop.MapReduceJob.ExecuteJob<UserQuestionsExtractionJob>();                  var info = result.Info;                    Console.WriteLine("Done: {0} ", DateTime.Now);                  Console.WriteLine("\nInfo From Server\n----------------------");                  Console.WriteLine("StandardError: " + info.StandardError);                  Console.WriteLine("\n----------------------");                  Console.WriteLine("StandardOut: " + info.StandardOut);                  Console.WriteLine("\n----------------------");                  Console.WriteLine("ExitCode: " + info.ExitCode);              }              catch(Exception ex)              {                  Console.WriteLine("Error: {0} ", ex.StackTrace.ToString(CultureInfo.InvariantCulture));               }              Console.WriteLine("Press Any Key To Exit..");              Console.ReadLine();          }      }      }

现在编译和运行上的程序。 执行工作(ExecuteJob)会上传所需的二进制文件到集群,并初始化一个Hadoop数据流工作(Streaming Job),它会在集群上运行我们的映射(Mappers),并输入存储在输入文件夹中的Posts文件。我们的控制台程序把任务提交到云端,并等待执行的结果。Hadoop SDK将更新映射-归并二进制文件到二进制容器(blob)中,并组建所需命令行来执行任务(见之前写的理解如何手动实现的文章-)。你可以点击桌面快捷方式中的Hadoop映射归并状态追踪来检查头结点中的任务。

如果一切进展顺利的话,你会看到如下结果:

image

正如你上面看到的,你可以在/output/Cooking目录中找到输出。如果你RDP到你集群的头结点,现在检查你的输出目录,你应该可以看到Map Reduce Job创建的如下文件。

image

正如你所期待的,这些文件包含了提取的数据,这些数据代表UserId,QuestionId--所有被一个用户回答的问题。如果你愿意,你可以把数据从HDFS加载到Hive,并用带有ODBC的Microsoft Excel观察到同样的结果。可以参看我之前写的文章。

第 4 步 – 构建推荐系统并产生推荐

作为下一步,我们需要构建共生矩阵并运行推荐作业,将{UserId,QuestionId}数据转变为推荐。幸运的是,我们不需要为此写Map Reduce作业。我们可以使用Hadoop的同时使用Mahout库。 这里阅读Mahout相关资料

RDP到集群的头节点,因为需要安装Mahout。下载本文写作时最新版本的Mahout (0.7),原样拷贝到集群头节点的c:\app\dist目录。

image

Mahout的推荐作业(Recommender Job)支持以多种算法创建推荐——在本例中,我们将使用 SIMILARITY_COOCCURRENCE。 Mahout网站的 算法页面有更多关于推荐,聚类和分类的算法信息。我们将使用/output/Cooking目录的文件创建推荐信息。

该运行推荐作业了。创建一个users.txt,在该文件中记下推荐用户的ID,并且拷贝到HDFS(Hadoop Distributed File System)。

image

现在,接下来的命令将启动推荐作业。记住,我们会用上面Map Reduce作业的输出文件作为推荐系统的输入。我们点击启动这个推荐作业。这会在/recommend/目录下产生输出,就是在users.txt文件中指定的所有用户。可以使用–num 推荐开关指定针对每个用户的推荐数量。如果用户与条目之间有偏好关系,(如用户播放一首歌的次数),那么可以采用{user,item,preferencevalue}形式给推荐系统提供输入数据集——在本例中,我们省略了偏好权重。

注意: 如果下面的命令在运行后失败了,提示输出目录已经存在,就试试使用hadoop fs –rmr temp and hadoop fs –rmr /recommend/  删除tmp目录和输出目录。

hadoop jar c:\Apps\dist\mahout-0.7\mahout-core-0.7-job.jar    org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -s SIMILARITY_COOCCURRENCE    --input=/output/Cooking    --output=/recommend/    --usersFile=/data/users.txt 

作业结束以后,检查/recommend/文件夹,试着打印一下生成文件的内容。你将会看到top推荐,针对users.txt文件中的每个用户Id。

image

因此,如果给用户类似建议,推荐引擎会认为User  1393可以回答问题6419, 16897 等等。你可以试验一下其他类似的类,像SIMILARITY_LOGLIKELIHOOD, SIMILARITY_PEARSON_CORRELATION等等,以便找到最好的结果。迭代与优化到你满意为止。

不过这里尝试的是另一种实践——检查Stack Exchange数据集,找出怎样去创建一个推荐,根据用户喜欢的问题,显示出'你或许也会喜欢‘的问题?

总结

在这个例子中,我们手动上传所需的输入文件到HDFS,并手动触发推荐系统任务(Recommender Job)。事实上, 你可以利用Hadoop For Azure SDK自动完成整个的工作流。但这是另一篇文章要讲的,敬请期待。现实中有很多分析工作要做,包括为提取并转存数据到HDFS编写映射/归并,自动创建hive表,用HiveQL或者PIG执行操作等等。然而,我们刚刚检查的这些步骤涉及到用Azure,Hadoop和Mahout做一些有意义的事。

你在你的移动应用或者ASP.NET程序中访问这些数据,或者使用Sqoop导入到SQL Server,或者把数据加载到Hive表中,这是我刚提到的。快乐编程,机器学习(Happy Coding and Machine Learning)。把HD Insight捆绑到现存的应用来建立端对端工作流,如果你对这种案列比较感兴趣,你可以联系我

我推荐你继续阅读

  • 我之前的文章            *关于 HDInsight
  • Mahout实践               *非常不错的开始,如果你想知道更多细节。
  • Mahout 教程 –           *一系列关于mahout的教程