pyh0n/pyhon/connection/api.py

149 lines
5.6 KiB
Python
Raw Normal View History

2023-02-13 06:11:38 +05:30
import json
import logging
from datetime import datetime
2023-02-13 08:06:09 +05:30
from pyhon import const
2023-04-09 21:43:50 +05:30
from pyhon.appliance import HonAppliance
2023-04-10 00:20:28 +05:30
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
2023-02-13 06:11:38 +05:30
_LOGGER = logging.getLogger()
2023-04-09 21:43:50 +05:30
class HonAPI:
2023-04-10 03:17:33 +05:30
def __init__(self, email="", password="", anonymous=False) -> None:
2023-02-13 06:11:38 +05:30
super().__init__()
self._email = email
self._password = password
2023-04-10 03:17:33 +05:30
self._anonymous = anonymous
2023-04-09 21:43:50 +05:30
self._hon = None
2023-04-10 03:17:33 +05:30
self._hon_anonymous = None
2023-02-13 06:11:38 +05:30
async def __aenter__(self):
2023-04-10 00:20:28 +05:30
return await self.create()
2023-02-13 06:11:38 +05:30
async def __aexit__(self, exc_type, exc_val, exc_tb):
2023-04-09 21:43:50 +05:30
await self._hon.close()
2023-02-13 06:11:38 +05:30
2023-04-10 00:20:28 +05:30
async def create(self):
2023-04-10 03:17:33 +05:30
self._hon_anonymous = HonAnonymousConnectionHandler()
if not self._anonymous:
self._hon = await HonConnectionHandler(self._email, self._password).create()
2023-04-10 00:20:28 +05:30
return self
2023-02-13 06:11:38 +05:30
2023-04-10 00:20:28 +05:30
async def load_appliances(self):
2023-04-09 21:43:50 +05:30
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
2023-04-10 00:20:28 +05:30
return await resp.json()
async def load_commands(self, appliance: HonAppliance):
2023-02-13 06:11:38 +05:30
params = {
2023-04-10 00:20:28 +05:30
"applianceType": appliance.appliance_type,
"code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id,
"firmwareId": appliance.info["eepromId"],
"macAddress": appliance.mac_address,
"fwVersion": appliance.info["fwVersion"],
2023-02-13 06:11:38 +05:30
"os": const.OS,
"appVersion": const.APP_VERSION,
2023-04-10 00:20:28 +05:30
"series": appliance.info["series"],
2023-02-13 06:11:38 +05:30
}
url = f"{const.API_URL}/commands/v1/retrieve"
2023-04-09 21:43:50 +05:30
async with self._hon.get(url, params=params) as response:
2023-02-13 06:11:38 +05:30
result = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
return {}
return result
2023-04-10 00:20:28 +05:30
async def command_history(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
2023-04-09 21:43:50 +05:30
async with self._hon.get(url) as response:
2023-03-11 07:01:56 +05:30
result = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
2023-04-10 00:20:28 +05:30
async def last_activity(self, appliance: HonAppliance):
2023-03-19 05:38:54 +05:30
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
2023-04-10 00:20:28 +05:30
params = {"macAddress": appliance.mac_address}
2023-04-09 21:43:50 +05:30
async with self._hon.get(url, params=params) as response:
2023-03-19 05:38:54 +05:30
result = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
2023-04-10 00:20:28 +05:30
async def load_attributes(self, appliance: HonAppliance):
2023-02-13 06:11:38 +05:30
params = {
2023-04-10 00:20:28 +05:30
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
2023-04-10 00:25:36 +05:30
"category": "CYCLE",
2023-02-13 06:11:38 +05:30
}
url = f"{const.API_URL}/commands/v1/context"
2023-04-09 21:43:50 +05:30
async with self._hon.get(url, params=params) as response:
2023-02-13 06:11:38 +05:30
return (await response.json()).get("payload", {})
2023-04-10 00:20:28 +05:30
async def load_statistics(self, appliance: HonAppliance):
2023-02-13 06:11:38 +05:30
params = {
2023-04-10 00:20:28 +05:30
"macAddress": appliance.mac_address,
2023-04-10 00:25:36 +05:30
"applianceType": appliance.appliance_type,
2023-02-13 06:11:38 +05:30
}
url = f"{const.API_URL}/commands/v1/statistics"
2023-04-09 21:43:50 +05:30
async with self._hon.get(url, params=params) as response:
2023-02-13 06:11:38 +05:30
return (await response.json()).get("payload", {})
2023-04-10 00:20:28 +05:30
async def send_command(self, appliance, command, parameters, ancillary_parameters):
2023-02-13 06:11:38 +05:30
now = datetime.utcnow().isoformat()
data = {
2023-04-10 00:20:28 +05:30
"macAddress": appliance.mac_address,
2023-02-13 06:11:38 +05:30
"timestamp": f"{now[:-3]}Z",
"commandName": command,
2023-04-10 00:20:28 +05:30
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(),
2023-02-13 06:11:38 +05:30
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
2023-04-10 00:25:36 +05:30
"energyLabel": "0",
2023-02-13 06:11:38 +05:30
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
2023-04-10 00:25:36 +05:30
"applianceType": appliance.appliance_type,
2023-02-13 06:11:38 +05:30
}
url = f"{const.API_URL}/commands/v1/send"
2023-04-09 21:43:50 +05:30
async with self._hon.post(url, json=data) as resp:
2023-04-10 00:20:28 +05:30
json_data = await resp.json()
if json_data.get("payload", {}).get("resultCode") == "0":
2023-02-13 06:11:38 +05:30
return True
return False
2023-04-09 21:43:50 +05:30
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response:
result = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language="en", beta=True):
url = f"{const.API_URL}/app-config"
payload = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
2023-04-10 00:25:36 +05:30
"os": const.OS,
2023-04-09 21:43:50 +05:30
}
2023-04-10 00:25:36 +05:30
payload = json.dumps(payload, separators=(",", ":"))
2023-04-09 21:43:50 +05:30
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language="en"):
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}
2023-04-10 00:20:28 +05:30
async def close(self):
await self._hon.close()