diff --git a/resources/language/resource.language.en_gb/strings.po b/resources/language/resource.language.en_gb/strings.po index fcce413..8f7c72c 100644 --- a/resources/language/resource.language.en_gb/strings.po +++ b/resources/language/resource.language.en_gb/strings.po @@ -64,6 +64,14 @@ msgctxt "#30204" msgid "Help 30204" msgstr "" +msgctxt "#30211" +msgid "Live endpoint" +msgstr "" + +msgctxt "#30212" +msgid "Help 30212" +msgstr "" + msgctxt "#30205" msgid "Use login credentials" msgstr "" diff --git a/resources/language/resource.language.fr_fr/strings.po b/resources/language/resource.language.fr_fr/strings.po index 22e8f7c..77f4163 100644 --- a/resources/language/resource.language.fr_fr/strings.po +++ b/resources/language/resource.language.fr_fr/strings.po @@ -62,7 +62,15 @@ msgstr "Fournisseur d'accès à internet" msgctxt "#30204" msgid "Help 30204" -msgstr "Sélectionner votre fournisser" +msgstr "Sélectionner votre fournisseur" + +msgctxt "#30211" +msgid "Live endpoint" +msgstr "Endpoint pour le live" + +msgctxt "#30212" +msgid "Help 30212" +msgstr "Sélectionner l'endpoint pour le live" msgctxt "#30205" msgid "Use login credentials" diff --git a/resources/lib/providers/abstract_orange_provider.py b/resources/lib/providers/abstract_orange_provider.py index bc00254..ae7ac5b 100644 --- a/resources/lib/providers/abstract_orange_provider.py +++ b/resources/lib/providers/abstract_orange_provider.py @@ -1,38 +1,34 @@ -# ruff: noqa: D102 +# ruff: noqa: D102,I001 """Orange provider template.""" import json import re from abc import ABC -from datetime import date, datetime, timedelta, timezone -from time import strptime +from datetime import date, datetime, timedelta from typing import List from urllib.parse import urlencode import xbmc +import xbmcvfs +import xbmcaddon + from requests import Session from requests.exceptions import JSONDecodeError, RequestException -from lib.exceptions import AuthenticationRequired, StreamDataDecodeError, StreamNotIncluded, StreamRequestException +from lib.exceptions import AuthenticationRequired, StreamDataDecodeError, StreamNotIncluded from lib.providers.abstract_provider import AbstractProvider -from lib.utils.kodi import build_addon_url, get_addon_setting, get_drm, get_global_setting, log, set_addon_setting -from lib.utils.request import get_random_ua, request, request_json - -_PROGRAMS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/live/v3/applications/STB4PC/programs?period={period}&epgIds=all&mco={mco}" -_CATCHUP_CHANNELS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/catchup/v4/applications/PC/channels" -_CATCHUP_ARTICLES_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/catchup/v4/applications/PC/channels/{channel_id}/categories/{category_id}" -_CATCHUP_VIDEOS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/catchup/v4/applications/PC/groups/{group_id}" -_CHANNELS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/pds/v1/live/ew?everywherePopulation=OTT_Metro" +from lib.utils.kodi import build_addon_url, get_addon_setting, get_drm, get_global_setting, log +from lib.utils.request import request, request_json -_LIVE_STREAM_ENDPOINT = "https://mediation-tv.orange.fr/all/api-gw/stream/v2/auth/accountToken/live/{stream_id}?deviceModel=WEB_PC&customerOrangePopulation=OTT_Metro" -_CATCHUP_STREAM_ENDPOINT = "https://mediation-tv.orange.fr/all/api-gw/catchup/v4/auth/accountToken/applications/PC/videos/{stream_id}/stream?terminalModel=WEB_PC&terminalId=" -_STREAM_LOGO_URL = "https://proxymedia.woopic.com/api/v1/images/2090{path}" -_HOMEPAGE_URL = "https://tv.orange.fr/" -_LOGIN_URL = "https://login.orange.fr" +WEBAPP_PUBLIC_URL = "https://tv.orange.fr" +IDME_URL = 'https://login.orange.fr' -_LICENSE_ENDPOINT = "https://mediation-tv.orange.fr/all/api-gw/license/v1/auth/accountToken" +## old EPG endpoint +PROGRAMS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/live/v3/applications/STB4PC/programs?period={period}&epgIds=all&mco={mco}" +## new EPG endpoint +# BFF_EPG_GRID_URL = "/bff-guidetv-epggrid/v1/auth/accountToken/guidetv/epggrid/epggrid-tv-grid" class AbstractOrangeProvider(AbstractProvider, ABC): """Abstract Orange Provider.""" @@ -41,32 +37,167 @@ class AbstractOrangeProvider(AbstractProvider, ABC): mco = "OFR" groups = {} + def __init__(self): + """Fetch auth data from home page if needed and build __pinia and __config variables.""" + headers = {} + profile_path = xbmcaddon.Addon().getAddonInfo('profile') + pinia_file = f'{profile_path}__pinia.json' + config_file = f'{profile_path}__config.json' + + if xbmcvfs.exists(pinia_file): + with xbmcvfs.File(pinia_file) as f: + self.__pinia = json.load(f) + + with xbmcvfs.File(config_file) as f: + self.__config = json.load(f) + + self._update_config_file(config_file) + + if self.__pinia["tv_token_expires"] > datetime.now().timestamp(): + return + + wassup_expires = self.__pinia['wassup_expires'] + if not wassup_expires or wassup_expires > datetime.now().timestamp(): + wassup = self.__pinia["wassup"] + headers = {"Cookie": f"wassup={wassup}"} + + response = request("GET", WEBAPP_PUBLIC_URL, headers=headers) + self.__pinia = json.loads(re.search('window.__pinia = ({.*?});', response.text).group(1)) + + if not self.__pinia['authStore']['isAuthenticated']: + log("Not on Orange network, login required", xbmc.LOGINFO) + if not get_addon_setting("provider.use_credentials", bool): + raise AuthenticationRequired("Please provide and use your credentials") + wassup = self._login() + headers = {"Cookie": f"wassup={wassup}"} + response = request("GET", WEBAPP_PUBLIC_URL, headers=headers) + self.__pinia = json.loads(re.search('window.__pinia = ({.*?});', response.text).group(1)) + self.__pinia["wassup"] = wassup + + for cookie in response.cookies: + if cookie.name == 'wassup': + self.__pinia["wassup"] = cookie.value + self.__pinia["wassup_expires"] = cookie.expires # None on Orange network + break + + two_days = 48 * 60 * 60 + self.__pinia["tv_token_expires"] = datetime.now().timestamp() + two_days + + with xbmcvfs.File(pinia_file, 'w') as f: + json.dump(self.__pinia, f) + + if "_AbstractOrangeProvider__config" not in self.__dict__: + self.__config = json.loads(re.search('window.__config = ({.*?});', response.text).group(1)) + + self._update_config_file(config_file) + + def _update_config_file(self, config_file: str): + """Update __config file if needed.""" + live_keys = { + "live": "LIVE_STREAM_URL", + "livecontrol": "LIVE_STREAM_STARTOVER_URL" + } + live_endpoint = get_addon_setting("provider.live") + + if not xbmcvfs.exists(config_file) or self.__config["LIVE_KEY"] != live_keys[live_endpoint]: + self.__config["LIVE_KEY"] = live_keys[live_endpoint] + + self.__config["PARAMS"] = "&".join([ + f'appVersion={self.__pinia["appStore"]["appVersion"]}', + f'deviceModel={self.__config["BFF_DEVICE_MODEL"]}', + f'deviceCategory={self.__config["BFF_DEVICE_CATEGORY"]}', + f'customerOrangePopulation={self.__pinia["userStore"]["rights"]["customerOrangePopulation"]}', + f'customerCanalPopulation={self.__pinia["userStore"]["rights"]["customerCanalPopulation"]}', + 'consentPersoStatus=0', + ]) + + with xbmcvfs.File(config_file, 'w') as f: + json.dump(self.__config, f) + + def _login(self): + """Login to Orange to get wassup cookie.""" + session = Session() + session.headers = { + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate, br, zstd', + 'Content-Type': 'application/json', + } + + try: + request("POST", f"{IDME_URL}/api/access", data='{}', session=session) + except RequestException: + log("Error while authenticating (access)", xbmc.LOGWARNING) + return + + try: + data = json.dumps({"login": get_addon_setting("provider.username"), "loginOrigin": "input"}) + request("POST", f"{IDME_URL}/api/login", data=data, session=session) + except RequestException: + log("Error while authenticating (login)", xbmc.LOGWARNING) + return + + try: + data = json.dumps({"password": get_addon_setting("provider.password"), "remember": True}) + request("POST", f"{IDME_URL}/api/password", data=data, session=session) + except RequestException: + log("Error while authenticating (password)", xbmc.LOGWARNING) + return + + wassup = session.cookies.get('wassup') + if not wassup: + log("Error while authenticating (wassup not found)", xbmc.LOGWARNING) + + return wassup + + def _get_auth_headers(self) -> dict: + return { + "Cookie": f"wassup={self.__pinia['wassup']}", + "tv_token": f"Bearer {self.__pinia['authStore']['authInitEw']['token']}" + } + def get_live_stream_info(self, stream_id: str) -> dict: """Get live stream info.""" - return self._get_stream_info(_LIVE_STREAM_ENDPOINT, stream_id) + stream_id_parts = stream_id.split("|") + four_hours = 4 * 60 * 60 + if len(stream_id_parts) == 1: + start = four_hours + else: + start = four_hours - (datetime.now().timestamp() - int(stream_id_parts[1])) + if start < 1: + start = 1 + live_key = self.__config["LIVE_KEY"] + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config[live_key]}/{stream_id_parts[0]}' + f'?{self.__config["PARAMS"]}' + ) + return self._get_stream_info(url, start) def get_catchup_stream_info(self, stream_id: str) -> dict: """Get catchup stream info.""" - return self._get_stream_info(_CATCHUP_STREAM_ENDPOINT, stream_id) + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["REPLAY_AUTH_URL"]}/PC/videos/{stream_id}/stream' + f'?terminalModel={self.__config["REPLAY_TERMINAL_MODEL"]}&terminalId=' + ) + return self._get_stream_info(url) def get_streams(self) -> list: """Load stream data from Orange and convert it to JSON-STREAMS format.""" - # @todo: use new API to check if channel is part of subscription - channels = request_json(_CHANNELS_ENDPOINT, default={"channels": {}})["channels"] - channels.sort(key=lambda channel: channel["displayOrder"]) - + headers = self._get_auth_headers() + url = f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["LIVE_SERVICE_PLAN_URL"]}?{self.__config["PARAMS"]}' + channels = request_json(url, headers=headers)["channels"] log(f"{len(channels)} channels found", xbmc.LOGINFO) + # log(f"channels : {channels}", xbmc.LOGINFO) return [ { - "id": str(channel["idEPG"]), + "id": str(channel["epgId"]), "name": channel["name"], - "preset": str(channel["displayOrder"]), - "logo": self._extract_logo(channel["logos"]), - "stream": build_addon_url(f"/stream/live/{self._get_channel_live_id(channel)}"), - "group": [group_name for group_name in self.groups if int(channel["idEPG"]) in self.groups[group_name]], + "preset": channel["lcn"], + "logo": channel["logos"][0]['logoImageUrl'] + '|verifypeer=false', + "stream": build_addon_url(f'/stream/live/{channel["externalId"]}'), + "group": [group_name for group_name in self.groups if channel["epgId"] in self.groups[group_name]], } - for channel in channels + for channel in channels if channel['subscribed'] and channel["externalId"] != 'livetv_acces_limite_ctv' ] def get_epg(self) -> dict: @@ -116,6 +247,7 @@ def get_epg(self) -> dict: "description": program["synopsis"], "genre": program["genre"] if program["genreDetailed"] is None else program["genreDetailed"], "image": image, + "stream": build_addon_url(f'/stream/live/{program["externalId"]}|{program["diffusionDate"]}'), } ) @@ -129,152 +261,149 @@ def get_catchup_items(self, levels: List[str]) -> list: self._get_catchup_categories, self._get_catchup_articles, self._get_catchup_videos, + self._get_catchup_video, ] return item_getters[depth](*levels) def _get_catchup_channels(self) -> list: """Load available catchup channels.""" - channels = request_json(_CATCHUP_CHANNELS_ENDPOINT, default=[]) + headers = self._get_auth_headers() + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["BFF_REPLAY_LISTING_CHANNELS_URL"]}' + f'?{self.__config["PARAMS"]}' + ) + channels = request_json(url, headers=headers)['page']['sections'][1]['items'] + # log(f"channels : {channels}", xbmc.LOGINFO) return [ { "is_folder": True, - "label": str(channel["name"]).upper(), - "path": build_addon_url(f"/catchup/{channel['id']}"), - "art": {"thumb": channel["logos"]["ref_millenials_partner_white_logo"]}, + "label": channel["titleText"], + "path": build_addon_url( + f"/catchup/{channel['events']['onClick']['track']['trackParams'][-2]['value']}" + ), + "art": {"thumb": channel["titleLogoImageUrl"] + '|verifypeer=false'}, } - for channel in channels + for channel in channels if 'rightTag' not in channel ] def _get_catchup_categories(self, channel_id: str) -> list: """Return a list of catchup categories for the specified channel id.""" - url = f"{_CATCHUP_CHANNELS_ENDPOINT}/{channel_id}" - categories = request_json(url, default={"categories": {}})["categories"] + headers = self._get_auth_headers() + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["BFF_REPLAY_LANDING_CHANNEL_URL"]}' + f'?{self.__config["PARAMS"]}&channelId={channel_id}' + ) + categories = request_json(url, headers=headers)['page']['sections'][1:] + # log(f"categories : {categories}", xbmc.LOGINFO) return [ { "is_folder": True, - "label": category["name"][0].upper() + category["name"][1:], - "path": build_addon_url(f"/catchup/{channel_id}/{category['id']}"), + "label": category['title']['text'], + "path": build_addon_url( + f"/catchup/{channel_id}/{category['extraLink']['events']['onClick']['navigate']['route'].split('/')[-1]}" + ), } for category in categories ] def _get_catchup_articles(self, channel_id: str, category_id: str) -> list: """Return a list of catchup groups for the specified channel id and category id.""" - url = _CATCHUP_ARTICLES_ENDPOINT.format(channel_id=channel_id, category_id=category_id) - articles = request_json(url, default={"articles": {}})["articles"] + headers = self._get_auth_headers() + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["BFF_REPLAY_LISTING_CHANNEL_CATEGORY_URL"]}' + f'?{self.__config["PARAMS"]}&channelId={channel_id}&categoryId={category_id}' + ) + articles = request_json(url, headers=headers)['page']['sections'][1]['items'] + # log(f"articles : {articles}", xbmc.LOGINFO) + + table = [] + for article in articles: + route = article['events']['onClick']['navigate']['route'].split('/') + if route[-2] == 'videos': + path = f"/catchup/{channel_id}/{category_id}/{route[-1]}/{route[-1]}" + else: + path = f"/catchup/{channel_id}/{category_id}/{route[-1]}" + table.append( + { + "is_folder": True, + "label": article["titleText"], + "path": build_addon_url(path), + "art": {"poster": article["backgroundImageUrl"] + '|verifypeer=false'}, + } + ) + + return table + + def _get_catchup_videos(self, channel_id: str, category_id: str, article_id: str) -> list: + """Return a list of catchup videos for the specified channel id, category id and article id.""" + headers = self._get_auth_headers() + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["BFF_REPLAY_FIP_URL"]}/fip-replay-serial' + f'?{self.__config["PARAMS"]}&groupId={article_id}' + ) + videos = request_json(url, headers=headers)['page']['sections'][2]['items'] + # log(f"videos : {videos}", xbmc.LOGINFO) + + if len(videos) == 1: + return self._get_catchup_video(channel_id, category_id, article_id, self._get_video_id(videos[0])) return [ { "is_folder": True, - "label": article["title"], - "path": build_addon_url(f"/catchup/{channel_id}/{category_id}/{article['id']}"), - "art": {"poster": article["covers"]["ref_16_9"]}, + "label": video["titleText"], + "path": build_addon_url( + f"/catchup/{channel_id}/{category_id}/{article_id}/{self._get_video_id(video)}" + ), + "art": {"poster": video["backgroundImageUrl"] + '|verifypeer=false'}, + "info": { + "plot": f'[B]{video["titleText"]}[/B]\n{video["mainText"]["text"]}', + }, } - for article in articles + for video in videos ] - def _get_catchup_videos(self, channel_id: str, category_id: str, article_id: str) -> list: - """Return a list of catchup videos for the specified channel id and article id.""" - url = _CATCHUP_VIDEOS_ENDPOINT.format(group_id=article_id) - videos = request_json(url, default={"videos": {}})["videos"] + def _get_catchup_video(self, channel_id: str, category_id: str, article_id: str, video_id: str) -> dict: + """Return a catchup video for the article id.""" + headers = self._get_auth_headers() + url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["BFF_REPLAY_FIP_URL"]}/fip-replay-unit' + f'?{self.__config["PARAMS"]}&videoId={video_id}' + ) + video = request_json(url, headers=headers)['page']['sections'] + # log(f"video : {video}", xbmc.LOGINFO) return [ { "is_folder": False, - "label": video["title"], - "path": build_addon_url(f"/stream/catchup/{video['id']}"), - "art": {"poster": video["covers"]["ref_16_9"]}, + "label": video[1]['title']['text'], + "path": build_addon_url(f"/stream/catchup/{video_id}"), + "art": {"poster": video[1]["backgroundImageUrl"] + '|verifypeer=false'}, "info": { - "duration": int(video["duration"]) * 60, - "genres": video["genres"], - "plot": video["longSummary"], - "premiered": datetime.fromtimestamp(int(video["broadcastDate"]) / 1000).strftime("%Y-%m-%d"), - "year": int(video["productionDate"]), + # "duration": int(video["duration"]) * 60, + # "genres": video["genres"], + "plot": ( + f'[B]{video[1]["information"]["text"]}[/B]\n' + f'[I]{video[1]["footNote"]}[/I]\n' + f'{video[2]["items"][0]["paragraphs"][0]["text"]}' + ), + # "premiered": datetime.fromtimestamp(int(video["broadcastDate"]) / 1000).strftime("%Y-%m-%d"), + # "year": int(video["productionDate"]), }, } - for video in videos ] - def _get_stream_info(self, stream_endpoint: str, stream_id: str) -> dict: - """Load stream info from Orange.""" - stream_endpoint_url = stream_endpoint.format(stream_id=stream_id) - now = datetime.now(timezone.utc) - session_data = get_addon_setting("provider.session_data", dict) - - session = Session() - session.headers = { - "Accept": "application/xhtml+xml,application/xml", - "Content-Type": "application/json", - "User-Agent": get_random_ua(), - } - - if self._is_session_data_valid(session_data, now): - try: - stream_info = self._request_stream_info(stream_endpoint_url, session_data) - except StreamRequestException: - if get_addon_setting("provider.use_credentials", bool): - self._login(session) - - session_data = self._refresh_session_data(session, now) - stream_info = self._request_stream_info(stream_endpoint_url, session_data) - return stream_info - - if get_addon_setting("provider.use_credentials", bool): - self._login(session) - - session_data = self._refresh_session_data(session, now) - stream_info = self._request_stream_info(stream_endpoint_url, session_data) - - return stream_info - - def _is_session_data_valid(self, session_data: dict, at: datetime = None) -> bool: - """Check if session data is valid.""" - if not session_data.get("tv_token") or not session_data.get("wassup"): - return False - - if at is None: - at = datetime.now(timezone.utc) - - if not session_data.get("tv_token_expires") or at.timestamp() > session_data.get("tv_token_expires"): - return False - - try: - decoded_wassup = bytes.fromhex(session_data.get("wassup")).decode() - xwvd = re.search("\|X_WASSUP_VALID_DATE=(.*?)\|", decoded_wassup).group(1) - wassup_expires_at = datetime(*(strptime(xwvd, "%Y%m%d%H%M%S")[0:6])).replace(tzinfo=timezone.utc) - return wassup_expires_at > at - except (TypeError, AttributeError): - return False - - def _refresh_session_data(self, session: Session, now: datetime) -> dict: - """Fetch session data from home page.""" - try: - response = request("GET", _HOMEPAGE_URL, session=session) - session_data = { - "tv_token": json.loads(re.search('"token":(".*?")', response.text).group(1)), - "tv_token_expires": now.timestamp() + 30 * 60, - "wassup": session.cookies.get("wassup"), - } - except RequestException as e: - raise AuthenticationRequired("Cannot initiate new session (request failed)") from e - except AttributeError as e: - raise AuthenticationRequired("Cannot initiate new session (tv token not found)") from e - except JSONDecodeError as e: - raise StreamDataDecodeError("Cannot initiate new session (tv token not loaded") from e - - set_addon_setting("provider.session_data", dict(session_data)) - return session_data + def _get_video_id(self, video: dict) -> str: + for item in video['events']['onClick']['track']['trackParams']: + if item["key"] == "strip_item_id": + return item["value"] - def _request_stream_info(self, stream_endpoint_url: str, session_data: dict) -> dict: - """Load stream data from Orange.""" + def _get_stream_info(self, stream_endpoint_url: str, start: float = 0) -> dict: + """Load stream info from Orange.""" + headers = self._get_auth_headers() try: - headers = { - "tv_token": f"Bearer {session_data.get('tv_token')}", - "Cookie": f"wassup={session_data.get('wassup')}", - } res = request("GET", stream_endpoint_url, headers=headers) stream = res.json() except RequestException as e: @@ -285,14 +414,18 @@ def _request_stream_info(self, stream_endpoint_url: str, session_data: dict) -> except JSONDecodeError as e: raise StreamDataDecodeError() from e - return self._format_stream_info(stream, session_data) + return self._format_stream_info(stream, start) - def _format_stream_info(self, stream: dict, session_data: dict) -> dict: + def _format_stream_info(self, stream: dict, start: float) -> dict: """Compute stream info.""" + headers = self._get_auth_headers() protectionData = stream.get("protectionData") or stream.get("protectionDatas") path = stream.get("streamURL") or stream.get("url") - license_server_url = _LICENSE_ENDPOINT if stream.get("url") is None else "" + license_server_url = ( + f'{self.__config["TV_GW_BASE_URL"]}/{self.__config["STREAM_LICENSE_AUTH_URL"]}' + if stream.get("url") is None else "" + ) for system in protectionData: if system.get("keySystem") == get_drm(): @@ -301,65 +434,24 @@ def _format_stream_info(self, stream: dict, session_data: dict) -> dict: stream_info = { "path": path, "protocol": "mpd", - "mime_type": "application/xml+dash", + "mime_type": "application/dash+xml", "drm_config": { "license_type": get_drm(), "license_key": "|".join( { "licence_server_url": license_server_url, - "headers": urlencode( - { - "tv_token": f"Bearer {session_data.get('tv_token')}", - "Content-Type": "", - "Cookie": f"wassup={session_data.get('wassup')}", - } - ), + "headers": urlencode({"Content-Type": "", **headers}), "post_data": "R{SSM}", "response_data": "", }.values() ), }, + "start": start, } log(stream_info, xbmc.LOGDEBUG) return stream_info - def _login(self, session: Session): - """Login to Orange.""" - try: - data = json.dumps({}) - request("POST", f"{_LOGIN_URL}/api/access", data=data, session=session) - except RequestException: - log("Error while authenticating (init)", xbmc.LOGWARNING) - return - - try: - data = json.dumps({"login": get_addon_setting("provider.username"), "loginOrigin": "input"}) - request("POST", f"{_LOGIN_URL}/api/login", data=data, session=session) - except RequestException: - log("Error while authenticating (login)", xbmc.LOGWARNING) - return - - try: - data = json.dumps({"password": get_addon_setting("provider.password"), "remember": True}) - request("POST", f"{_LOGIN_URL}/api/password", data=data, session=session) - except RequestException: - log("Error while authenticating (password)", xbmc.LOGWARNING) - - if "wassup" not in session.cookies: - log("Error while authenticating (wassup not found)", xbmc.LOGWARNING) - - def _get_channel_live_id(self, channel: dict) -> str: - """Get live id for given channel.""" - return channel["technicalChannels"]["live"][0]["liveTargetURLRelativePath"] - - def _extract_logo(self, logos: list, definition_type: str = "mobileAppliDark") -> str: - for logo in logos: - if logo["definitionType"] == definition_type: - return _STREAM_LOGO_URL.format(path=logo["listLogos"][0]["path"]) - - return None - def _get_programs(self, start_day: datetime, days_to_display: int, chunks_per_day: int, mco: str = "OFR") -> list: """Return the programs for today (default) or the specified period.""" programs = [] @@ -375,7 +467,8 @@ def _get_programs(self, start_day: datetime, days_to_display: int, chunks_per_da except ValueError: period = "today" - url = _PROGRAMS_ENDPOINT.format(period=period, mco=mco) + url = PROGRAMS_ENDPOINT.format(period=period, mco=mco) programs.extend(request_json(url, default=[])) + # log(f"programs : {programs}", xbmc.LOGINFO) return programs diff --git a/resources/lib/providers/fr/orange_france.py b/resources/lib/providers/fr/orange_france.py index f7f6bfa..fe7e115 100644 --- a/resources/lib/providers/fr/orange_france.py +++ b/resources/lib/providers/fr/orange_france.py @@ -8,6 +8,7 @@ class OrangeFranceProvider(AbstractOrangeProvider): def __init__(self): """Initialize Orange France provider.""" + super().__init__() self.mco = "OFR" self.groups = { "TNT": [192, 4, 80, 34, 47, 118, 111, 445, 119, 195, 446, 444, 234, 78, 481, 226, 458, 482, 1404, 1401] diff --git a/resources/lib/utils/gui.py b/resources/lib/utils/gui.py index 47a1109..2dd4a5d 100644 --- a/resources/lib/utils/gui.py +++ b/resources/lib/utils/gui.py @@ -53,7 +53,11 @@ def create_play_item(stream_info: dict = None, inputstream_addon: str = "") -> L play_item.setProperty("inputstream", inputstream_addon) play_item.setProperty("inputstream.adaptive.manifest_type", stream_info.get("protocol")) - play_item.setProperty("inputstream.adaptive.play_timeshift_buffer", "true") + + # https://github.com/xbmc/inputstream.adaptive/issues/574 + start = stream_info['start'] + InfoTagVideo = play_item.getVideoInfoTag() + InfoTagVideo.setResumePoint(start, 1) drm_config = stream_info.get("drm_config", {}) keys = ["license_type", "license_key", "license_data", "server_certificate", "license_flags", "pre_init_data"] diff --git a/resources/lib/utils/request.py b/resources/lib/utils/request.py index ea07007..0d89a0a 100644 --- a/resources/lib/utils/request.py +++ b/resources/lib/utils/request.py @@ -38,7 +38,7 @@ def get_random_ua() -> str: return _RANDOM_USER_AGENT -def request(method: str, url: str, headers: Mapping[str, str] = None, data=None, session: Session = None) -> Response: +def request(method: str, url: str, headers: Mapping[str, str] = {}, data=None, session: Session = None) -> Response: """Send HTTP request using requests.""" default_headers = { "Accept": "application/json", @@ -48,17 +48,20 @@ def request(method: str, url: str, headers: Mapping[str, str] = None, data=None, "User-Agent": get_random_ua(), } - headers = {**(session.headers if session is not None else default_headers), **(headers or {})} session = session or Session() + if 'User-Agent' in session.headers: + del session.headers['User-Agent'] + session.headers = {**default_headers, **session.headers, **headers} log(f"Fetching {url}", xbmc.LOGDEBUG) - res = session.request(method, url, headers=headers, data=data) - res.raise_for_status() + res = session.request(method, url, data=data) log(f" -> {res.status_code}", xbmc.LOGDEBUG) + log(f" -> {res.text}", xbmc.LOGDEBUG) + res.raise_for_status() return res -def request_json(url: str, headers: Mapping[str, str] = None, default: Union[dict, list] = None) -> Union[dict, list]: +def request_json(url: str, headers: Mapping[str, str] = {}, default: Union[dict, list] = None) -> Union[dict, list]: """Send HTTP request and load json response.""" try: res = request("GET", url, headers=headers) diff --git a/resources/settings.xml b/resources/settings.xml index e78675e..9b3c974 100644 --- a/resources/settings.xml +++ b/resources/settings.xml @@ -11,10 +11,10 @@ - + + -