Python开发的 dht网络爬虫

jopen 10年前

使用 libtorrent 的python绑定库实现一个dht网络爬虫,抓取dht网络中的磁力链接。

 

dht 网络简介

p2p网络

在P2P网络中,通过种子文件下载资源时,要知道资源在P2P网络中哪些计算机中,这些传输资源的计算机称作peer。在传统的P2P网络中,使用tracker服务器跟踪资源的peer。要下载资源,首先需要取得这些peer。

 

dht网络

tracker服务器面临一些版权和法律问题。于是出现了DHT,它把tracker上的资源peer信息分散到了整个网络中。dht网络是由分布 式节点构成,节点(node)是实现了DHT协议的p2p客户端。P2P客户端程序既是peer也是node。DHT网络有多种算法,常用的有 Kademlia。

 

dht网络下载

P2P客户端使用种子文件下载资源时,如果没有tracker服务器,它就向DHT网络查询资源的peer列表, 然后从peer下载资源。

 

Magnet是磁力链接

资源的标识在DHT网络中称为infohash,是一个通过sha1算法得到的20字节长的字符串。infohash是使用种子文件的文件描述信息 计算得到。磁力链接是把infohash编码成16进制字符串得到。P2P客户端使用磁力链接,下载资源的种子文件,然后根据种子文件下载资源。

 

Kademlia 算法

Kademlia是DHT网络的一种实现, 具体的算法参见:DHT协议

 

KRPC 协议

KRPC 是节点之间的交互协议,使用UDP来传送。

包括4种请求:ping,find_node,get_peer,announce_peer。其中get_peer和announce_peer是节点间查询资源的主要消息。

 

dht 爬虫原理

主要的思路就是伪装为p2p客户端,加入dht网络,收集dht网络中的get_peer和announce_peer消息,这些消息是其他node发送给伪装的p2p客户端的udp消息。

 

本文dht爬虫的实现

爬虫运行环境

  1. linux 系统

  2. python 2.7

  3. libtorrent 库的python绑定

  4. twisted 网络库

  5. 防火墙开启固定的udp和tcp端口

 

libtorrent 库的介绍

libtorrent库是p2p下载的客户端库,有丰富的接口,可以用来开发下载p2p网络上的资源。它有python的绑定库,本爬虫就是使用它的python库开发的。

在libtorrent中有几个概念需要解释下。 session 相当于p2p客户端,session开启一个tcp和一个udp端口,用来与其他p2p客户端交换数据。可以在一个进程内定义多个session,也就是多个p2p客户端,来加快收集速度。

alert是libtorrent中用来收集各种消息的队列,每个session都有一个自己的alert消息队列。KRPC协议的get_peer和announce_peer消息也是从这个队列中获取,就是用这两个消息收集磁力链接的。

 

主要实现代码

爬虫实现的主要代码比较简单

# 事件通知处理函数      def _handle_alerts(self, session, alerts):          while len(alerts):              alert = alerts.pop()              # 获取dht_announce_alert和dht_get_peer_alert消息              # 从这两消息收集磁力链接              if isinstance(alert, lt.add_torrent_alert):                  alert.handle.set_upload_limit(self._torrent_upload_limit)                  alert.handle.set_download_limit(self._torrent_download_limit)              elif isinstance(alert, lt.dht_announce_alert):                  info_hash = alert.info_hash.to_string().encode('hex')                  if info_hash in self._meta_list:                      self._meta_list[info_hash] += 1                  else:                      self._meta_list[info_hash] = 1                      self._current_meta_count += 1              elif isinstance(alert, lt.dht_get_peers_alert):                  info_hash = alert.info_hash.to_string().encode('hex')                  if info_hash in self._meta_list:                      self._meta_list[info_hash] += 1                  else:                      self._infohash_queue_from_getpeers.append(info_hash)                      self._meta_list[info_hash] = 1                      self._current_meta_count += 1        def start_work(self):          '''主工作循环,检查消息,显示状态'''          # 清理屏幕          begin_time = time.time()          show_interval = self._delay_interval          while True:              for session in self._sessions:                  session.post_torrent_updates()                  # 从队列中获取信息                  self._handle_alerts(session, session.pop_alerts())              time.sleep(self._sleep_time)              if show_interval > 0:                  show_interval -= 1                  continue              show_interval = self._delay_interval                # 统计信息显示              show_content = ['torrents:']              interval = time.time() - begin_time              show_content.append('  pid: %s' % os.getpid())              show_content.append('  time: %s' %                                  time.strftime('%Y-%m-%d %H:%M:%S'))              show_content.append('  run time: %s' % self._get_runtime(interval))              show_content.append('  start port: %d' % self._start_port)              show_content.append('  collect session num: %d' %                                  len(self._sessions))              show_content.append('  info hash nums from get peers: %d' %                                  len(self._infohash_queue_from_getpeers))              show_content.append('  torrent collection rate: %f /minute' %                                  (self._current_meta_count * 60 / interval))              show_content.append('  current torrent count: %d' %                                  self._current_meta_count)              show_content.append('  total torrent count: %d' %                                  len(self._meta_list))              show_content.append('\n')                # 存储运行状态到文件              try:                  with open(self._stat_file, 'wb') as f:                      f.write('\n'.join(show_content))                  with open(self._result_file, 'wb') as f:                      json.dump(self._meta_list, f)              except Exception as err:                  pass                # 测试是否到达退出时间              if interval >= self._exit_time:                  # stop                  break                # 每天结束备份结果文件              self._backup_result()            # 销毁p2p客户端          for session in self._sessions:              torrents = session.get_torrents()              for torrent in torrents:                  session.remove_torrent(torrent)

 

运行效率

在我的一台512M内存,单cpu机器上。爬虫刚开始运行稍慢,运行几分钟后收集速度稳定在 180个每分钟,1小时采集10000左右。

运行状态

run times: 12    torrents:    pid: 11480  time: 2014-08-18 22:45:01    run time: day: 0, hour: 0, minute: 12, second: 25    start port: 32900    collect session num: 20    info hash nums from get peers: 2222    torrent collection rate: 179.098480 /minute    current torrent count: 2224    total torrent count: 58037

 

爬虫完整代码

完整的代码参见:https://github.com/blueskyz/DHTCrawler

还包括一个基于twisted的监控进程,用来查看爬虫状态,在爬虫进程退出后重新启动。

 

原文链接:python开发的 dht网络爬虫