DHT協(xié)議作為BT協(xié)議的一個(gè)輔助,是非常好玩的。它主要是為了在BT正式下載時(shí)得到種子或者BT資源。傳統(tǒng)的網(wǎng)絡(luò),需要一臺(tái)中央服務(wù)器存放種子或者BT資源,不僅浪費(fèi)服務(wù)器資源,還容易出現(xiàn)單點(diǎn)的各種問題,而DHT網(wǎng)絡(luò)則是為了去中心化,也就是說任意時(shí)刻,這個(gè)網(wǎng)絡(luò)總有節(jié)點(diǎn)是亮的,你可以去詢問問這些亮的節(jié)點(diǎn),從而將自己加入DHT網(wǎng)絡(luò)。
要實(shí)現(xiàn)DHT協(xié)議的網(wǎng)絡(luò)爬蟲,主要分3步,第一步是得到資源信息(infohash,160bit,20字節(jié),可以編碼為40字節(jié)的十六進(jìn)制字符串),第二步是確認(rèn)這些infohash是有效的,第三步是通過有效的infohash下載到BT的種子文件,從而得到對(duì)這個(gè)資源的完整描述。
其中第一步是其他節(jié)點(diǎn)用DHT協(xié)議中的get_peers方法向爬蟲發(fā)送請求得到的,第二步是其他節(jié)點(diǎn)用DHT協(xié)議中的announce_peer向爬蟲發(fā)送請求得到的,第三步可以有幾種方式得到,比如可以去一些保存種子的網(wǎng)站根據(jù)infohash直接下載到,或者通過announce_peer的節(jié)點(diǎn)來下載到,具體如何實(shí)現(xiàn),可以取決于你自己的爬蟲。
DHT協(xié)議中的主要幾個(gè)操作:
主要負(fù)責(zé)通過UDP與外部節(jié)點(diǎn)交互,封裝4種基本操作的請求以及相應(yīng)。
ping:檢查一個(gè)節(jié)點(diǎn)是否“存活”
在一個(gè)爬蟲里主要有兩個(gè)地方用到ping,第一是初始路由表時(shí),第二是驗(yàn)證節(jié)點(diǎn)是否存活時(shí)
find_node:向一個(gè)節(jié)點(diǎn)發(fā)送查找節(jié)點(diǎn)的請求
在一個(gè)爬蟲中主要也是兩個(gè)地方用到find_node,第一是初始路由表時(shí),第二是驗(yàn)證桶是否存活時(shí)
get_peers:向一個(gè)節(jié)點(diǎn)發(fā)送查找資源的請求
在爬蟲中有節(jié)點(diǎn)向自己請求時(shí)不僅像個(gè)正常節(jié)點(diǎn)一樣做出回應(yīng),還需要以此資源的info_hash為機(jī)會(huì)盡可能多的去認(rèn)識(shí)更多的節(jié)點(diǎn)。如圖,get_peers實(shí)際上最后一步是announce_peer,但是因?yàn)榕老x不能announce_peer,所以實(shí)際上get_peers退化成了find_node操作。
announce_peer:向一個(gè)節(jié)點(diǎn)發(fā)送自己已經(jīng)開始下載某個(gè)資源的通知
爬蟲中不能用announce_peer,因?yàn)檫@就相當(dāng)于通報(bào)虛假資源,對(duì)方很容易從上下文中判斷你是否通報(bào)了虛假資源從而把你禁掉。
基于Python的DHT爬蟲
修改自github開源爬蟲,原作者名字有些。,這里直接將項(xiàng)目地址列出:https://github.com/Fuck-You-GFW/simDHT,有g(shù)ithub帳號(hào)的請給原作者star,后續(xù)我將結(jié)果放入db,外加用tornado做一個(gè)簡單的查詢界面出來放在github上,先備份一下代碼
#!/usr/bin/env python # encoding: utf-8 import socket from hashlib import sha1 from random import randint from struct import unpack from socket import inet_ntoa from threading import Timer, Thread from time import sleep from collections import deque from bencode import bencode, bdecode BOOTSTRAP_NODES = ( ("router.bittorrent.com", 6881), ("dht.transmissionbt.com", 6881), ("router.utorrent.com", 6881) ) TID_LENGTH = 2 RE_JOIN_DHT_INTERVAL = 3 TOKEN_LENGTH = 2 def entropy(length): return "".join(chr(randint(0, 255)) for _ in xrange(length)) def random_id(): h = sha1() h.update(entropy(20)) return h.digest() def decode_nodes(nodes): n = [] length = len(nodes) if (length % 26) != 0: return n for i in range(0, length, 26): nid = nodes[i:i+20] ip = inet_ntoa(nodes[i+20:i+24]) port = unpack("!H", nodes[i+24:i+26])[0] n.append((nid, ip, port)) return n def timer(t, f): Timer(t, f).start() def get_neighbor(target, nid, end=10): return target[:end]+nid[end:] class KNode(object): def __init__(self, nid, ip, port): self.nid = nid self.ip = ip self.port = port class DHTClient(Thread): def __init__(self, max_node_qsize): Thread.__init__(self) self.setDaemon(True) self.max_node_qsize = max_node_qsize self.nid = random_id() self.nodes = deque(maxlen=max_node_qsize) def send_krpc(self, msg, address): try: self.ufd.sendto(bencode(msg), address) except Exception: pass def send_find_node(self, address, nid=None): nid = get_neighbor(nid, self.nid) if nid else self.nid tid = entropy(TID_LENGTH) msg = { "t": tid, "y": "q", "q": "find_node", "a": { "id": nid, "target": random_id() } } self.send_krpc(msg, address) def join_DHT(self): for address in BOOTSTRAP_NODES: self.send_find_node(address) def re_join_DHT(self): if len(self.nodes) == 0: self.join_DHT() timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def auto_send_find_node(self): wait = 1.0 / self.max_node_qsize while True: try: node = self.nodes.popleft() self.send_find_node((node.ip, node.port), node.nid) except IndexError: pass sleep(wait) def process_find_node_response(self, msg, address): nodes = decode_nodes(msg["r"]["nodes"]) for node in nodes: (nid, ip, port) = node if len(nid) != 20: continue if ip == self.bind_ip: continue if port < 1 or port > 65535: continue n = KNode(nid, ip, port) self.nodes.append(n) class DHTServer(DHTClient): def __init__(self, master, bind_ip, bind_port, max_node_qsize): DHTClient.__init__(self, max_node_qsize) self.master = master self.bind_ip = bind_ip self.bind_port = bind_port self.process_request_actions = { "get_peers": self.on_get_peers_request, "announce_peer": self.on_announce_peer_request, } self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.ufd.bind((self.bind_ip, self.bind_port)) timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def run(self): self.re_join_DHT() while True: try: (data, address) = self.ufd.recvfrom(65536) msg = bdecode(data) self.on_message(msg, address) except Exception: pass def on_message(self, msg, address): try: if msg["y"] == "r": if msg["r"].has_key("nodes"): self.process_find_node_response(msg, address) elif msg["y"] == "q": try: self.process_request_actions[msg["q"]](msg, address) except KeyError: self.play_dead(msg, address) except KeyError: pass def on_get_peers_request(self, msg, address): try: infohash = msg["a"]["info_hash"] tid = msg["t"] nid = msg["a"]["id"] token = infohash[:TOKEN_LENGTH] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(infohash, self.nid), "nodes": "", "token": token } } self.send_krpc(msg, address) except KeyError: pass def on_announce_peer_request(self, msg, address): try: infohash = msg["a"]["info_hash"] #print msg["a"] tname = msg["a"]["name"] token = msg["a"]["token"] nid = msg["a"]["id"] tid = msg["t"] if infohash[:TOKEN_LENGTH] == token: if msg["a"].has_key("implied_port") and msg["a"]["implied_port"] != 0: port = address[1] else: port = msg["a"]["port"] if port < 1 or port > 65535: return self.master.log(infohash, (address[0], port),tname) except Exception: pass finally: self.ok(msg, address) def play_dead(self, msg, address): try: tid = msg["t"] msg = { "t": tid, "y": "e", "e": [202, "Server Error"] } self.send_krpc(msg, address) except KeyError: pass def ok(self, msg, address): try: tid = msg["t"] nid = msg["a"]["id"] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(nid, self.nid) } } self.send_krpc(msg, address) except KeyError: pass class Master(object): def log(self, infohash,address=None,tname=None): hexinfohash = infohash.encode("hex") print "info_hash is: %s,name is: %s from %s:%s" % ( hexinfohash,tname, address[0], address[1] ) print "magnet:?xt=urn:btih:%s&dn=%s" % (hexinfohash, tname) # using example if __name__ == "__main__": # max_node_qsize bigger, bandwith bigger, speed higher dht = DHTServer(Master(), "0.0.0.0", 6882, max_node_qsize=200) dht.start() dht.auto_send_find_node()
PS: DHT協(xié)議中有幾個(gè)重點(diǎn)的需要澄清的地方:
1. node與infohash同樣使用160bit的表示方式,160bit意味著整個(gè)節(jié)點(diǎn)空間有2^160 = 730750818665451459101842416358141509827966271488,是48位10進(jìn)制,也就是說有百億億億億億個(gè)節(jié)點(diǎn)空間,這么大的節(jié)點(diǎn)空間,是足夠存放你的主機(jī)節(jié)點(diǎn)以及任意的資源信息的。
2. 每個(gè)節(jié)點(diǎn)有張路由表。每張路由表由一堆K桶組成,所謂K桶,就是桶中最多只能放K個(gè)節(jié)點(diǎn),默認(rèn)是8個(gè)。而桶的保存則是類似一顆前綴樹的方式。相當(dāng)于一張8桶的路由表中最多有160-4個(gè)K桶。
3. 根據(jù)DHT協(xié)議的規(guī)定,每個(gè)infohash都是有位置的,因此,兩個(gè)infohash之間就有距離一說,而兩個(gè)infohash的距離就可以用異或來表示,即infohash1 xor infohash2,也就是說,高位一樣的話,他們的距離就近,反之則遠(yuǎn),這樣可以快速的計(jì)算兩個(gè)節(jié)點(diǎn)的距離。計(jì)算這個(gè)距離有什么用呢,在DHT網(wǎng)絡(luò)中,如果一個(gè)資源的infohash與一個(gè)節(jié)點(diǎn)的infohash越近則該節(jié)點(diǎn)越有可能擁有該資源的信息,為什么呢?可以想象,因?yàn)槿巳硕加猛瑯拥木嚯x算法去遞歸的詢問離資源接近的節(jié)點(diǎn),并且只要該節(jié)點(diǎn)做出了回應(yīng),那么就會(huì)得到一個(gè)announce信息,也就是說跟資源infohash接近的節(jié)點(diǎn)就有更大的概率拿到該資源的infohash
4. 根據(jù)上述算法,DHT中的查詢是跳躍式查詢,可以迅速的跨越的的節(jié)點(diǎn)桶而接近目標(biāo)節(jié)點(diǎn)桶。之所以在遠(yuǎn)處能夠大幅度跳躍,而在近處只能小幅度跳躍,原因是每個(gè)節(jié)點(diǎn)的路由表中離自身越接近的節(jié)點(diǎn)保存得越多,如下圖
5. 在一個(gè)DHT網(wǎng)絡(luò)中當(dāng)爬蟲并不容易,不像普通爬蟲一樣,看到資源就可以主動(dòng)爬下來,相反,因?yàn)榈玫劫Y源的方式(get_peers, announce_peer)都是被動(dòng)的,所以爬蟲的方式就有些變化了,爬蟲所要做的事就是像個(gè)正常節(jié)點(diǎn)一樣去響應(yīng)其他節(jié)點(diǎn)的查詢,并且得到其他節(jié)點(diǎn)的回應(yīng),把其中的數(shù)據(jù)收集下來就算是完成工作了。而爬蟲唯一能做的,是盡可能的去多認(rèn)識(shí)其他節(jié)點(diǎn),這樣,才能有更多其他節(jié)點(diǎn)來向你詢問。
6. 有人說,那么我把DHT爬蟲的K桶中的容量K增大是不是就能增加得到資源的機(jī)會(huì),其實(shí)不然,之前也分析過了,DHT爬蟲最重要的信息來源全是被動(dòng)的,因?yàn)槟悴荒茉龃髣e人的K,所以距離遠(yuǎn)的節(jié)點(diǎn)保存你自身的概率就越小,當(dāng)然距離遠(yuǎn)的節(jié)點(diǎn)去請求你的概率相對(duì)也比較小。
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com