aardio 文档
aardio 范例: DHT 客户端
import win.ui;
import win.debounce;
import wsock.bt;
import thread.works;
/*DSG{{*/
var winform = win.form(text="DHT 客户端";right=828;bottom=574)
winform.add(
editDhtNodes={cls="edit";left=14;top=16;right=812;bottom=384;db=1;dl=1;dr=1;dt=1;edge=1;multiline=1;vscroll=1;z=1};
editMetadata={cls="edit";left=14;top=389;right=813;bottom=563;db=1;dl=1;dr=1;edge=1;multiline=1;vscroll=1;z=2};
splitter={cls="splitter";left=14;top=384;right=812;bottom=389;frame=1;horz=1;z=3}
)
/*}}*/
// DHT 路由表
var routingTable = wsock.bt.kBucket("/dht.dat");
// DHT 客户端
var dhtClient,err = wsock.bt.dhtClient("6881-7999",routingTable);
assert(dhtClient,err)
// 多线程下载元数据
var metadataWorker = thread.works( 8,
function(peersInfo,winform) {
import wsock.bt.metadataClient;
import bencode;
var exClient = wsock.bt.metadataClient(peersInfo.infoHash,peersInfo.nodeId);
for(i=1;#peersInfo){
var metadata,err = exClient.getMetadata(peersInfo[i].ip,peersInfo[i].port);
if(metadata){
var metaInfo = bencode.decode(metadata);
bencode.save("/"+string.hex(peersInfo.infoHash)+".torrent", {info=metaInfo})
//可重写为保存到数据库
..io.print(i,"获取 torrent 成功:", metaInfo.name);
winform.onGetMetadataComplete(peersInfo.infoHash,true);
return; // 成功后即返回,不再尝试其他 peer
}
else{
..io.stderr.write(err,string.hex(peersInfo.infoHash),'\n')
}
}
winform.onGetMetadataComplete(peersInfo.infoHash,false);
},/*waitTime*/,/*init*/,winform
);
/*
info hash 队列。
多 DHT 客户端可修改为从数据库服务端存取。
*/
var infoHashQueue = {
count = 0;
hashes = {};
getPeersAttempts = {};
getPeersQueue = [];
add = function(infoHash){
if(owner.hashes[infoHash]){
return;
}
owner.hashes[infoHash] = "lookup";
owner.getPeersAttempts[infoHash] = 0;
table.push(owner.getPeersQueue,infoHash);
winform.editMetadata.print("发现 Magnet 链接:"
, "magnet:?xt=urn:btih:" + string.hex(infoHash));
owner.count = owner.count + 1;
return true;
};
maxAttempts = 20;
next = function(){
var q = owner.getPeersQueue;
var infoHash = q[1];
if(!infoHash) return;
if(
owner.getPeersAttempts[infoHash]>=owner.maxAttempts
&& (#q > 1) ){
..table.shift(q);
..table.push(q,infoHash)
infoHash = q[1];
}
owner.hashes[infoHash] = "busy";
owner.getPeersAttempts[infoHash] = owner.getPeersAttempts[infoHash] + 1;
return infoHash,owner.getPeersAttempts[infoHash];
};
queriedNodes = ..table.cache();
getClosestNodes = function(infoHash){
var queriedNodes = owner.queriedNodes[infoHash]
if(!queriedNodes){
queriedNodes = {}
owner.queriedNodes[infoHash] = queriedNodes;
}
//刷新最接近节点
var closestNodes = routingTable.getClosestNodes(infoHash,50);
var result = [];
var limit = 0;
//排除已查询节点
for(i, node in closestNodes) {
if(!queriedNodes[node.nodeId]){
queriedNodes[node.nodeId] = 1;
table.push(result,node);
limit++;
if(limit>8) return result; //避免一次发送过多请求
}
}
return result;
};
peersCache = {};
delayedMetadataFetch = {};
onPeersDiscovered = function(infoHash,peers){
// 合并同一个 info hash 的 peers
owner.peersCache[infoHash] = table.append(owner.peersCache[infoHash]||[],peers);
// 延时下载
if(!owner.delayedMetadataFetch[infoHash]){
var this = owner;
owner.delayedMetadataFetch[infoHash] = win.debounce(
function(){
var peers = this.peersCache[infoHash];
peers.nodeId = dhtClient.nodeId
peers.infoHash = infoHash;
// 发送给下载线程
metadataWorker.push( peers );
winform.editMetadata.print("downloading/peers:"+#peers,string.hex(infoHash));
this.peersCache[infoHash] = null;
},10000
);
}
owner.delayedMetadataFetch[infoHash]();
// 停止发送 get peers ,切换到下一个 infoHash
owner.stopGetPeers(infoHash);
};
stopGetPeers = function(infoHash){
var idx = table.removeByValue(owner.getPeersQueue,infoHash);
owner.getPeersAttempts[infoHash] = 0;
owner.hashes[infoHash] = "metadata";
};
complete = function(infoHash,success){
owner.hashes[infoHash] = success ? "completed" : "failed";;
};
}
winform.onGetMetadataComplete = function(infoHash,success){
winform.editMetadata.print(success?"下载成功":"下载失败",string.hex(infoHash));
infoHashQueue.complete(infoHash,success);
}
// 退出保存 DHT 路由表
winform.onDestroy = function(){
routingTable.save();
metadataWorker.quit(5000);
}
dhtClient.onGetPeers = lambda (infoHash) infoHashQueue.add(infoHash);
dhtClient.onAnnouncePeer = lambda (infoHash) infoHashQueue.add(infoHash);
var onNodesOrPeersDiscovered = function(nodes,peers,ip,port,infoHash){
if(peers){
infoHashQueue.onPeersDiscovered(infoHash,peers);
}
elseif(nodes){
for i,node in nodes{
if routingTable.addNode(node) {
winform.editDhtNodes.print("发现新节点:"
, node.ip, node.port, "总节点数:", routingTable.getNodeCount());
}
}
}
}
// 定时查找可以获取元数据的 peers
winform.setInterval(
function(){
var infoHash,attempts = infoHashQueue.next()
if(!infoHash) return;
io.print("get peers:",attempts,string.hex(infoHash))
var closestNodes = infoHashQueue.getClosestNodes(infoHash);
for(i, node in closestNodes) {
dhtClient.sendGetPeers(node.ip, node.port,infoHash
,onNodesOrPeersDiscovered);
}
},1000
)
// 定时发现新的 DHT 节点
winform.setInterval(
function(){
var closestNodes = routingTable.getClosestNodes(string.random(20), 8);
for(i, node in closestNodes) {
dhtClient.sendFindNode(node.ip, node.port,
string.random(20), onNodesOrPeersDiscovered);
}
},1500 //当路由表增大发现新节点速度会越来越快,这时候应当增加延时,减慢速度
)
// 收到 DHT 请求消息
dhtClient.onQuery = function(ip,port,nodeId){
if( #nodeId == 20 ){
var added = routingTable.addNode({
ip = ip,
port = port,
nodeId = nodeId,
lastSeen = time()
});
if(added){
winform.editDhtNodes.print("添加新节点:", ip, port, "总节点数:", routingTable.getNodeCount());
}
}
}
// 定期清理旧节点
winform.setInterval(
function(){
routingTable.cleanOldNodes();
winform.editDhtNodes.print("═════════ 路由表状态 ═════════");
winform.editDhtNodes.print("总节点数:", routingTable.getNodeCount());
winform.editDhtNodes.print("InfoHash 数:", infoHashQueue.count );
},10000
)
winform.splitter.split(winform.editDhtNodes,winform.editMetadata);
if(_STUDIO_INVOKED){io.open(); }
winform.show();
win.loopMessage();
/*
可用 process.aria2 启动 BT 下载任务。
aria2 的本机 BT/DHT 服务端口可以通过本机 IP "127.0.0.1" 去连接。
DHT 端口(process.aria2 对象的 dhtListeningPort 属性)可用 wsock.bt.metadataClient 连接。
BT 端口(process.aria2 对象的 listeningPort 属性)可用 wsock.bt.dhtClient 连接。
*/
Markdown 格式