1 Star 0 Fork 3

纵使有花兼明月何堪无酒亦无人/NetworkHijackingSniffer

forked from XiaoSK/TCPHijackingSniffer 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
domainresolve.py 5.98 KB
一键复制 编辑 原始数据 按行查看 历史
XiaoSK 提交于 2014-08-04 04:54 +08:00 . Fix Unresolved Variables Error
#coding=gbk
# IDE: PyCharm 3.4.1, Version: Python 2.7.8
# Created in August 2014 by Xiao Shikang
import struct
import Queue
import re
from socket import socket, AF_INET, SOCK_DGRAM, timeout, SOCK_STREAM
from SocketServer import ThreadingUDPServer, BaseRequestHandler, UDPServer
import functools
import time
# ===================用户自定义区域 开始===================
DEF_PORT = 53 # DNS服务端口
DEF_TIMEOUT = 1 # 超时设置
MULTI_THREAD = True # 是否多线程
DEF_CACHE = False # 是否缓存
DEF_CACHE_TTL = 300 # 单位为秒
LOCAL_HOST = '127.0.0.1' # 本地所绑IP
CANONICAL_DNS = '4.2.2.4' # 较可靠的服务器(可在x-free.ml/dns.php找到更多DNS服务器)
DOMESTIC_DNS = '114.114.114.114' # 不可靠的服务器
SECURE_PROTOCOL = 'tcp' # 若域名匹配关键字,查询所用的协议
BLOCKED = ["dropbox"] # 使用TCP协议的域名关键字
# ===================用户自定义区域 结束===================
domain_filter = []
for domain in BLOCKED:
domain_filter.append(re.compile(domain.strip(), re.I))
# 判断域名是否符合过滤规则
def is_match(dm):
global domain_filter
for item in domain_filter:
if item.search(dm):
return item.pattern
return None
# 缓存
class Cache(object):
def __init__(self, func):
self.func = func
self.cache_name = "_cache_" + func.func_name
self.ttl = DEF_CACHE_TTL
if DEF_CACHE:
self.__call__ = self.cache_call
else:
self.__call__ = self.func
def __get__(self, obj, cls=None):
return functools.partial(self.__call__, obj)
def cache_call(self, *args):
pass
class CachedDomain(Cache):
def cache_call(self, obj, raw_data):
cache = obj.__dict__.setdefault(self.cache_name, {})
now = time.time()
url = self.extract_domain(raw_data)
try:
value, last_update = cache[url]
if now - last_update > self.ttl:
raise AttributeError
except (KeyError, AttributeError):
value = self.func(obj, raw_data)
cache[url] = (value, now)
return value
def extract_domain(self, data):
data = data[12:]
url = []
start = 0
len_of_data = ord(data[start])
while len_of_data != 0:
part = str(data[start + 1:start + 1 + len_of_data])
url.append(part)
start = start + len_of_data + 1
len_of_data = ord(data[start])
url = '.'.join(url)
return url
tcp_pool = Queue.Queue()
# TCP请求处理类
class TcpHandler(object):
def extract_domain(self, data):
data = data[12:]
url = []
start = 0
len_of_data = ord(data[start])
while len_of_data != 0:
part = str(data[start + 1:start + 1 + len_of_data])
url.append(part)
start = start + len_of_data + 1
len_of_data = ord(data[start])
url = '.'.join(url)
return url
@CachedDomain
def tcp_response(self, data):
resp = None
while not resp:
sock = self.get_tcp_sock()
size_data = self.tcp_packet_head(data)
sock.send(size_data + data)
try:
resp = sock.recv(1024)
except timeout:
sock = self.create_tcp_sock()
self.release_tcp_sock(sock)
return self.packet_body(resp)
def tcp_packet_head(self, data):
size = len(data)
return struct.pack('!H', size)
def packet_body(self, data):
size = struct.unpack('!H', data[0:2])[0]
return data[2:2 + size]
def get_tcp_sock(self):
try:
sock = tcp_pool.get(block=True, timeout=DEF_TIMEOUT)
except Queue.Empty:
sock = self.create_tcp_sock()
return sock
def release_tcp_sock(self, sock):
try:
tcp_pool.put(sock, block=False)
except Queue.Full:
old_sock = tcp_pool.get(block=False)
old_sock.close()
tcp_pool.put(sock, block=False)
def create_tcp_sock(self):
tcp_sock = socket(AF_INET, SOCK_STREAM)
tcp_sock.connect((self.tcp_dns_server, DEF_PORT))
tcp_sock.settimeout(5)
return tcp_sock
# DNS解析请求处理
class LocalDNSHandler(BaseRequestHandler, TcpHandler):
def setup(self):
if SECURE_PROTOCOL == 'tcp':
self.match_query = self.tcp_response
else:
self.match_query = self._getResponse
self.query_with_no_match = self.get_response_normal
self.tcp_dns_server = CANONICAL_DNS
self.normal_dns_server = (DOMESTIC_DNS, DEF_PORT)
self.dnsserver = (CANONICAL_DNS, DEF_PORT)
def handle(self):
data, client_socket = self.request
url = self.extract_domain(data)
if is_match(url):
resp = self.match_query(data)
else:
resp = self.query_with_no_match(data)
try:
client_socket.sendto(resp, 0, self.client_address)
except StandardError:
pass
@CachedDomain
def _getResponse(self, data):
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect(self.dnsserver)
sock.sendall(data)
sock.settimeout(5)
rspdata = None
try:
rspdata = sock.recv(65535)
except Exception:
rspdata = ''
finally:
sock.close()
return rspdata
@CachedDomain
def get_response_normal(self, data):
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect(self.normal_dns_server)
sock.sendall(data)
sock.settimeout(5)
resp = None
try:
resp = sock.recv(65535)
except Exception:
pass
finally:
sock.close()
return resp
# 多线程/单线程
if MULTI_THREAD:
class LocalDNSServer(ThreadingUDPServer):
pass
else:
class LocalDNSServer(UDPServer):
pass
# Go!
server = LocalDNSServer((LOCAL_HOST, DEF_PORT), LocalDNSHandler)
print "DNS服务器 v0.1 正在工作,尽情冲浪吧...(按Ctrl+C停止运行)"
server.serve_forever()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/xavier007/NetworkHijackingSniffer.git
git@gitee.com:xavier007/NetworkHijackingSniffer.git
xavier007
NetworkHijackingSniffer
NetworkHijackingSniffer
master

搜索帮助