aardio 文档

aardio 范例: DHT 客户端

import win.ui;
import win.debounce;
import wsock.bt;
import thread.works;
/*DSG{{*/
var winform = win.form(text="DHT 客户端";right=828;bottom=574)
winform.add(
editDhtNodes={cls="edit";left=14;top=16;right=812;bottom=384;db=1;dl=1;dr=1;dt=1;edge=1;multiline=1;vscroll=1;z=1};
editMetadata={cls="edit";left=14;top=389;right=813;bottom=563;db=1;dl=1;dr=1;edge=1;multiline=1;vscroll=1;z=2};
splitter={cls="splitter";left=14;top=384;right=812;bottom=389;frame=1;horz=1;z=3}
)
/*}}*/

// DHT 路由表
var routingTable = wsock.bt.kBucket("/dht.dat");

// DHT 客户端
var dhtClient,err = wsock.bt.dhtClient("6881-7999",routingTable);
assert(dhtClient,err)

// 多线程下载元数据
var metadataWorker = thread.works( 8,
    function(peersInfo,winform) {

        import wsock.bt.metadataClient;
        import bencode; 

        var exClient = wsock.bt.metadataClient(peersInfo.infoHash,peersInfo.nodeId);
        for(i=1;#peersInfo){ 
            var metadata,err = exClient.getMetadata(peersInfo[i].ip,peersInfo[i].port);

            if(metadata){  

                var metaInfo = bencode.decode(metadata); 
                bencode.save("/"+string.hex(peersInfo.infoHash)+".torrent", {info=metaInfo})

                //可重写为保存到数据库
                ..io.print(i,"获取 torrent 成功:", metaInfo.name);

                winform.onGetMetadataComplete(peersInfo.infoHash,true);
                return; // 成功后即返回,不再尝试其他 peer
            }
            else{
                ..io.stderr.write(err,string.hex(peersInfo.infoHash),'\n')
            }   
        }

        winform.onGetMetadataComplete(peersInfo.infoHash,false);
    },/*waitTime*/,/*init*/,winform
);

/* 
info hash 队列。
多 DHT 客户端可修改为从数据库服务端存取。
*/
var infoHashQueue = {
    count = 0;
    hashes = {};
    getPeersAttempts = {};
    getPeersQueue = [];

    add = function(infoHash){ 
        if(owner.hashes[infoHash]){
            return;
        }

        owner.hashes[infoHash] = "lookup";
        owner.getPeersAttempts[infoHash] = 0;
        table.push(owner.getPeersQueue,infoHash);

        winform.editMetadata.print("发现 Magnet 链接:"
            , "magnet:?xt=urn:btih:" + string.hex(infoHash));

        owner.count = owner.count + 1;
        return true;
    };

    maxAttempts = 20;
    next = function(){  
        var q = owner.getPeersQueue;
        var infoHash = q[1];
        if(!infoHash) return;

        if(
            owner.getPeersAttempts[infoHash]>=owner.maxAttempts 
                && (#q > 1) ){

            ..table.shift(q);
            ..table.push(q,infoHash)

            infoHash = q[1];
        }

        owner.hashes[infoHash] = "busy";
        owner.getPeersAttempts[infoHash] = owner.getPeersAttempts[infoHash] + 1;
        return infoHash,owner.getPeersAttempts[infoHash];
    };

    queriedNodes = ..table.cache();
    getClosestNodes = function(infoHash){
            var queriedNodes = owner.queriedNodes[infoHash]
            if(!queriedNodes){
                queriedNodes = {}
                owner.queriedNodes[infoHash] = queriedNodes;
            }

            //刷新最接近节点
            var closestNodes = routingTable.getClosestNodes(infoHash,50);

            var result = [];

            var limit = 0;

            //排除已查询节点
            for(i, node in closestNodes) { 
                if(!queriedNodes[node.nodeId]){
                queriedNodes[node.nodeId] = 1;
                table.push(result,node);

                limit++;

                if(limit>8) return result; //避免一次发送过多请求
            }
        }

        return result; 
    };  

    peersCache = {};
    delayedMetadataFetch = {};
    onPeersDiscovered = function(infoHash,peers){ 

        // 合并同一个 info hash 的 peers
        owner.peersCache[infoHash] = table.append(owner.peersCache[infoHash]||[],peers);

        // 延时下载
        if(!owner.delayedMetadataFetch[infoHash]){
            var this = owner;
            owner.delayedMetadataFetch[infoHash] = win.debounce( 
                function(){ 
                    var peers = this.peersCache[infoHash];
                    peers.nodeId = dhtClient.nodeId
                    peers.infoHash = infoHash;

                    // 发送给下载线程 
                    metadataWorker.push( peers );

                    winform.editMetadata.print("downloading/peers:"+#peers,string.hex(infoHash));
                    this.peersCache[infoHash] = null;
                },10000
            ); 
        }

        owner.delayedMetadataFetch[infoHash]();

        // 停止发送 get peers ,切换到下一个 infoHash
        owner.stopGetPeers(infoHash);
    };
    stopGetPeers = function(infoHash){
        var idx = table.removeByValue(owner.getPeersQueue,infoHash);

        owner.getPeersAttempts[infoHash] = 0;
        owner.hashes[infoHash] = "metadata";
    };
    complete = function(infoHash,success){
        owner.hashes[infoHash] = success ? "completed" : "failed";;
    };
}

winform.onGetMetadataComplete = function(infoHash,success){
    winform.editMetadata.print(success?"下载成功":"下载失败",string.hex(infoHash));
    infoHashQueue.complete(infoHash,success);
}

// 退出保存 DHT 路由表
winform.onDestroy = function(){  
    routingTable.save(); 
    metadataWorker.quit(5000);
}

dhtClient.onGetPeers = lambda (infoHash) infoHashQueue.add(infoHash); 
dhtClient.onAnnouncePeer = lambda (infoHash) infoHashQueue.add(infoHash); 

var onNodesOrPeersDiscovered = function(nodes,peers,ip,port,infoHash){
    if(peers){
        infoHashQueue.onPeersDiscovered(infoHash,peers);
    }
    elseif(nodes){ 
        for i,node in nodes{
            if routingTable.addNode(node) {
                    winform.editDhtNodes.print("发现新节点:"
                        , node.ip, node.port, "总节点数:", routingTable.getNodeCount());
            } 
        }  
    }   
}

// 定时查找可以获取元数据的 peers
winform.setInterval( 
    function(){ 

        var infoHash,attempts  = infoHashQueue.next()
        if(!infoHash) return;

        io.print("get peers:",attempts,string.hex(infoHash))

        var closestNodes = infoHashQueue.getClosestNodes(infoHash);
            for(i, node in closestNodes) {

            dhtClient.sendGetPeers(node.ip, node.port,infoHash
                ,onNodesOrPeersDiscovered); 
        }

    },1000
) 

// 定时发现新的 DHT 节点
winform.setInterval( 
    function(){
        var closestNodes = routingTable.getClosestNodes(string.random(20), 8);
        for(i, node in closestNodes) { 
            dhtClient.sendFindNode(node.ip, node.port, 
                string.random(20), onNodesOrPeersDiscovered);
        } 
    },1500 //当路由表增大发现新节点速度会越来越快,这时候应当增加延时,减慢速度
) 

// 收到 DHT 请求消息
dhtClient.onQuery = function(ip,port,nodeId){
    if( #nodeId == 20 ){

        var added = routingTable.addNode({
            ip = ip,
            port = port,
            nodeId = nodeId,
            lastSeen = time()
        });

        if(added){
            winform.editDhtNodes.print("添加新节点:", ip, port, "总节点数:", routingTable.getNodeCount());
        }
    }
}

// 定期清理旧节点
winform.setInterval(
    function(){
        routingTable.cleanOldNodes();

        winform.editDhtNodes.print("═════════ 路由表状态 ═════════");
        winform.editDhtNodes.print("总节点数:", routingTable.getNodeCount());
        winform.editDhtNodes.print("InfoHash 数:", infoHashQueue.count );
    },10000
)

winform.splitter.split(winform.editDhtNodes,winform.editMetadata);

if(_STUDIO_INVOKED){io.open(); }

winform.show(); 
win.loopMessage();

/*
可用 process.aria2 启动 BT 下载任务。

aria2 的本机 BT/DHT 服务端口可以通过本机 IP "127.0.0.1" 去连接。 
DHT 端口(process.aria2 对象的 dhtListeningPort 属性)可用 wsock.bt.metadataClient 连接。
BT 端口(process.aria2 对象的 listeningPort 属性)可用 wsock.bt.dhtClient 连接。
*/
Markdown 格式