代码拉取完成,页面将自动刷新
同步操作将从 XiaoSK/TCPHijackingSniffer 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#coding=gbk
# IDE: PyCharm 3.4.1, Version: Python 2.7.8
# Created in August 2014 by Xiao Shikang
import struct
import Queue
import re
from socket import socket, AF_INET, SOCK_DGRAM, timeout, SOCK_STREAM
from SocketServer import ThreadingUDPServer, BaseRequestHandler, UDPServer
import functools
import time
# ===================用户自定义区域 开始===================
DEF_PORT = 53 # DNS服务端口
DEF_TIMEOUT = 1 # 超时设置
MULTI_THREAD = True # 是否多线程
DEF_CACHE = False # 是否缓存
DEF_CACHE_TTL = 300 # 单位为秒
LOCAL_HOST = '127.0.0.1' # 本地所绑IP
CANONICAL_DNS = '4.2.2.4' # 较可靠的服务器(可在x-free.ml/dns.php找到更多DNS服务器)
DOMESTIC_DNS = '114.114.114.114' # 不可靠的服务器
SECURE_PROTOCOL = 'tcp' # 若域名匹配关键字,查询所用的协议
BLOCKED = ["dropbox"] # 使用TCP协议的域名关键字
# ===================用户自定义区域 结束===================
domain_filter = []
for domain in BLOCKED:
domain_filter.append(re.compile(domain.strip(), re.I))
# 判断域名是否符合过滤规则
def is_match(dm):
global domain_filter
for item in domain_filter:
if item.search(dm):
return item.pattern
return None
# 缓存
class Cache(object):
def __init__(self, func):
self.func = func
self.cache_name = "_cache_" + func.func_name
self.ttl = DEF_CACHE_TTL
if DEF_CACHE:
self.__call__ = self.cache_call
else:
self.__call__ = self.func
def __get__(self, obj, cls=None):
return functools.partial(self.__call__, obj)
def cache_call(self, *args):
pass
class CachedDomain(Cache):
def cache_call(self, obj, raw_data):
cache = obj.__dict__.setdefault(self.cache_name, {})
now = time.time()
url = self.extract_domain(raw_data)
try:
value, last_update = cache[url]
if now - last_update > self.ttl:
raise AttributeError
except (KeyError, AttributeError):
value = self.func(obj, raw_data)
cache[url] = (value, now)
return value
def extract_domain(self, data):
data = data[12:]
url = []
start = 0
len_of_data = ord(data[start])
while len_of_data != 0:
part = str(data[start + 1:start + 1 + len_of_data])
url.append(part)
start = start + len_of_data + 1
len_of_data = ord(data[start])
url = '.'.join(url)
return url
tcp_pool = Queue.Queue()
# TCP请求处理类
class TcpHandler(object):
def extract_domain(self, data):
data = data[12:]
url = []
start = 0
len_of_data = ord(data[start])
while len_of_data != 0:
part = str(data[start + 1:start + 1 + len_of_data])
url.append(part)
start = start + len_of_data + 1
len_of_data = ord(data[start])
url = '.'.join(url)
return url
@CachedDomain
def tcp_response(self, data):
resp = None
while not resp:
sock = self.get_tcp_sock()
size_data = self.tcp_packet_head(data)
sock.send(size_data + data)
try:
resp = sock.recv(1024)
except timeout:
sock = self.create_tcp_sock()
self.release_tcp_sock(sock)
return self.packet_body(resp)
def tcp_packet_head(self, data):
size = len(data)
return struct.pack('!H', size)
def packet_body(self, data):
size = struct.unpack('!H', data[0:2])[0]
return data[2:2 + size]
def get_tcp_sock(self):
try:
sock = tcp_pool.get(block=True, timeout=DEF_TIMEOUT)
except Queue.Empty:
sock = self.create_tcp_sock()
return sock
def release_tcp_sock(self, sock):
try:
tcp_pool.put(sock, block=False)
except Queue.Full:
old_sock = tcp_pool.get(block=False)
old_sock.close()
tcp_pool.put(sock, block=False)
def create_tcp_sock(self):
tcp_sock = socket(AF_INET, SOCK_STREAM)
tcp_sock.connect((self.tcp_dns_server, DEF_PORT))
tcp_sock.settimeout(5)
return tcp_sock
# DNS解析请求处理
class LocalDNSHandler(BaseRequestHandler, TcpHandler):
def setup(self):
if SECURE_PROTOCOL == 'tcp':
self.match_query = self.tcp_response
else:
self.match_query = self._getResponse
self.query_with_no_match = self.get_response_normal
self.tcp_dns_server = CANONICAL_DNS
self.normal_dns_server = (DOMESTIC_DNS, DEF_PORT)
self.dnsserver = (CANONICAL_DNS, DEF_PORT)
def handle(self):
data, client_socket = self.request
url = self.extract_domain(data)
if is_match(url):
resp = self.match_query(data)
else:
resp = self.query_with_no_match(data)
try:
client_socket.sendto(resp, 0, self.client_address)
except StandardError:
pass
@CachedDomain
def _getResponse(self, data):
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect(self.dnsserver)
sock.sendall(data)
sock.settimeout(5)
rspdata = None
try:
rspdata = sock.recv(65535)
except Exception:
rspdata = ''
finally:
sock.close()
return rspdata
@CachedDomain
def get_response_normal(self, data):
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect(self.normal_dns_server)
sock.sendall(data)
sock.settimeout(5)
resp = None
try:
resp = sock.recv(65535)
except Exception:
pass
finally:
sock.close()
return resp
# 多线程/单线程
if MULTI_THREAD:
class LocalDNSServer(ThreadingUDPServer):
pass
else:
class LocalDNSServer(UDPServer):
pass
# Go!
server = LocalDNSServer((LOCAL_HOST, DEF_PORT), LocalDNSHandler)
print "DNS服务器 v0.1 正在工作,尽情冲浪吧...(按Ctrl+C停止运行)"
server.serve_forever()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。