1 Star 0 Fork 0

chuck/MiChatGUI

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
miaccount.py 5.52 KB
一键复制 编辑 原始数据 按行查看 历史
wxtcstt 提交于 2023-03-28 22:46 +08:00 . Add files via upload
import base64
import hashlib
import json
import logging
import os
import random
import string
from urllib import parse
from aiohttp import ClientSession
_LOGGER = logging.getLogger(__package__)
def get_random(length):
return "".join(random.sample(string.ascii_letters + string.digits, length))
class MiTokenStore:
def __init__(self, token_path):
self.token_path = token_path
def load_token(self):
if os.path.isfile(self.token_path):
try:
with open(self.token_path) as f:
return json.load(f)
except Exception:
_LOGGER.exception("Exception on load token from %s", self.token_path)
return None
def save_token(self, token=None):
if token:
try:
with open(self.token_path, "w") as f:
json.dump(token, f, indent=2)
except Exception:
_LOGGER.exception("Exception on save token to %s", self.token_path)
elif os.path.isfile(self.token_path):
os.remove(self.token_path)
class MiAccount:
def __init__(self, session: ClientSession, username, password, token_store=None):
self.session = session
self.username = username
self.password = password
self.token_store = (
MiTokenStore(token_store) if isinstance(token_store, str) else token_store
)
self.token = token_store is not None and self.token_store.load_token()
async def login(self, sid):
if not self.token:
self.token = {"deviceId": get_random(16).upper()}
try:
resp = await self._serviceLogin(f"serviceLogin?sid={sid}&_json=true")
if resp["code"] != 0:
data = {
"_json": "true",
"qs": resp["qs"],
"sid": resp["sid"],
"_sign": resp["_sign"],
"callback": resp["callback"],
"user": self.username,
"hash": hashlib.md5(self.password.encode()).hexdigest().upper(),
}
resp = await self._serviceLogin("serviceLoginAuth2", data)
if resp["code"] != 0:
raise Exception(resp)
self.token["userId"] = resp["userId"]
self.token["passToken"] = resp["passToken"]
serviceToken = await self._securityTokenService(
resp["location"], resp["nonce"], resp["ssecurity"]
)
self.token[sid] = (resp["ssecurity"], serviceToken)
if self.token_store:
self.token_store.save_token(self.token)
return True
except Exception as e:
self.token = None
if self.token_store:
self.token_store.save_token()
_LOGGER.exception("Exception on login %s: %s", self.username, e)
return False
async def _serviceLogin(self, uri, data=None):
headers = {
"User-Agent": "APP/com.xiaomi.mihome APPV/6.0.103 iosPassportSDK/3.9.0 iOS/14.4 miHSTS"
}
cookies = {"sdkVersion": "3.9", "deviceId": self.token["deviceId"]}
if "passToken" in self.token:
cookies["userId"] = self.token["userId"]
cookies["passToken"] = self.token["passToken"]
url = "https://account.xiaomi.com/pass/" + uri
async with self.session.request(
"GET" if data is None else "POST",
url,
data=data,
cookies=cookies,
headers=headers,
ssl = False,
) as r:
raw = await r.read()
resp = json.loads(raw[11:])
_LOGGER.debug("%s: %s", uri, resp)
return resp
async def _securityTokenService(self, location, nonce, ssecurity):
nsec = "nonce=" + str(nonce) + "&" + ssecurity
clientSign = base64.b64encode(hashlib.sha1(nsec.encode()).digest()).decode()
async with self.session.get(
location + "&clientSign=" + parse.quote(clientSign)
) as r:
serviceToken = r.cookies["serviceToken"].value
if not serviceToken:
raise Exception(await r.text())
return serviceToken
async def mi_request(self, sid, url, data, headers, relogin=True):
if (self.token and sid in self.token) or await self.login(sid): # Ensure login
cookies = {
"userId": self.token["userId"],
"serviceToken": self.token[sid][1],
}
content = data(self.token, cookies) if callable(data) else data
method = "GET" if data is None else "POST"
_LOGGER.info("%s %s", url, content)
async with self.session.request(
method, url, data=content, cookies=cookies, headers=headers
) as r:
status = r.status
if status == 200:
resp = await r.json(content_type=None)
code = resp["code"]
if code == 0:
return resp
if "auth" in resp.get("message", "").lower():
status = 401
else:
resp = await r.text()
if status == 401 and relogin:
_LOGGER.warn("Auth error on request %s %s, relogin...", url, resp)
self.token = None # Auth error, reset login
return await self.mi_request(sid, url, data, headers, False)
else:
resp = "Login failed"
raise Exception(f"Error {url}: {resp}")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/chuck_chen/MiChatGUI.git
git@gitee.com:chuck_chen/MiChatGUI.git
chuck_chen
MiChatGUI
MiChatGUI
main

搜索帮助