Patch the main aiohttp.ClientSession() into YTlounge

This commit is contained in:
dmunozv04
2024-04-27 19:26:55 +02:00
parent ce95b6dbf0
commit 582b9bf725
4 changed files with 53 additions and 35 deletions

View File

@@ -5,9 +5,10 @@ import aiohttp
from . import api_helpers, ytlounge from . import api_helpers, ytlounge
async def pair_device(): async def pair_device(web_session):
try: try:
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV",
web_session=web_session)
pairing_code = input( pairing_code = input(
"Enter pairing code (found in Settings - Link with TV code): " "Enter pairing code (found in Settings - Link with TV code): "
) )
@@ -33,6 +34,7 @@ async def pair_device():
def main(config, debug: bool) -> None: def main(config, debug: bool) -> None:
print("Welcome to the iSponsorBlockTV cli setup wizard") print("Welcome to the iSponsorBlockTV cli setup wizard")
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
web_session = aiohttp.ClientSession()
if debug: if debug:
loop.set_debug(True) loop.set_debug(True)
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@@ -43,16 +45,17 @@ def main(config, debug: bool) -> None:
" \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2" " \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
) )
if ( if (
input( input(
"Do you want to remove the legacy 'atvs' entry (the app won't start" "Do you want to remove the legacy 'atvs' entry (the app won't start"
" with it present)? (y/n) " " with it present)? (y/n) "
) )
== "y" == "y"
): ):
del config["atvs"] del config["atvs"]
devices = config.devices devices = config.devices
while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n": while not input(
task = loop.create_task(pair_device()) f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n":
task = loop.create_task(pair_device(web_session))
loop.run_until_complete(task) loop.run_until_complete(task)
device = task.result() device = task.result()
if device: if device:
@@ -65,10 +68,10 @@ def main(config, debug: bool) -> None:
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
else: else:
if ( if (
input( input(
"API key only needed for the channel whitelist function. Add it? (y/n) " "API key only needed for the channel whitelist function. Add it? (y/n) "
) )
== "y" == "y"
): ):
print( print(
"Get youtube apikey here:" "Get youtube apikey here:"
@@ -79,7 +82,8 @@ def main(config, debug: bool) -> None:
skip_categories = config.skip_categories skip_categories = config.skip_categories
if skip_categories: if skip_categories:
if input("Skip categories already specified. Change them? (y/n) ") == "y": if input(
"Skip categories already specified. Change them? (y/n) ") == "y":
categories = input( categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor" "Enter skip categories (space or comma sepparated) Options: [sponsor"
" selfpromo exclusive_access interaction poi_highlight intro outro" " selfpromo exclusive_access interaction poi_highlight intro outro"
@@ -103,8 +107,9 @@ def main(config, debug: bool) -> None:
channel_whitelist = config.channel_whitelist channel_whitelist = config.channel_whitelist
if ( if (
input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") input(
== "y" "Do you want to whitelist any channels from being ad-blocked? (y/n) ")
== "y"
): ):
if not apikey: if not apikey:
print( print(
@@ -112,7 +117,6 @@ def main(config, debug: bool) -> None:
" otherwise the program will fail to start.\nYou can add one by" " otherwise the program will fail to start.\nYou can add one by"
" re-running this setup wizard." " re-running this setup wizard."
) )
web_session = aiohttp.ClientSession()
api_helper = api_helpers.ApiHelper(config, web_session) api_helper = api_helpers.ApiHelper(config, web_session)
while True: while True:
channel_info = {} channel_info = {}
@@ -152,7 +156,6 @@ def main(config, debug: bool) -> None:
channel_info["name"] = results[int(choice)][1] channel_info["name"] = results[int(choice)][1]
channel_whitelist.append(channel_info) channel_whitelist.append(channel_info)
# Close web session asynchronously # Close web session asynchronously
loop.run_until_complete(web_session.close())
config.channel_whitelist = channel_whitelist config.channel_whitelist = channel_whitelist
@@ -161,7 +164,8 @@ def main(config, debug: bool) -> None:
"Do you want to report skipped segments to sponsorblock. Only the segment" "Do you want to report skipped segments to sponsorblock. Only the segment"
" UUID will be sent? (y/n) " " UUID will be sent? (y/n) "
) )
== "n" == "n"
) )
print("Config finished") print("Config finished")
config.save() config.save()
loop.run_until_complete(web_session.close())

View File

@@ -10,13 +10,14 @@ from . import api_helpers, ytlounge
class DeviceListener: class DeviceListener:
def __init__(self, api_helper, config, device, debug: bool): def __init__(self, api_helper, config, device, debug: bool, web_session):
self.task: Optional[asyncio.Task] = None self.task: Optional[asyncio.Task] = None
self.api_helper = api_helper self.api_helper = api_helper
self.offset = device.offset self.offset = device.offset
self.name = device.name self.name = device.name
self.cancelled = False self.cancelled = False
self.logger = logging.getLogger(f"iSponsorBlockTV-{device.screen_id}") self.logger = logging.getLogger(f"iSponsorBlockTV-{device.screen_id}")
self.web_session = web_session
if debug: if debug:
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG)
else: else:
@@ -28,7 +29,7 @@ class DeviceListener:
self.logger.addHandler(sh) self.logger.addHandler(sh)
self.logger.info(f"Starting device") self.logger.info(f"Starting device")
self.lounge_controller = ytlounge.YtLoungeApi( self.lounge_controller = ytlounge.YtLoungeApi(
device.screen_id, config, api_helper, self.logger device.screen_id, config, api_helper, self.logger, self.web_session
) )
# Ensures that we have a valid auth token # Ensures that we have a valid auth token
@@ -155,7 +156,7 @@ def main(config, debug):
web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector) web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector)
api_helper = api_helpers.ApiHelper(config, web_session) api_helper = api_helpers.ApiHelper(config, web_session)
for i in config.devices: for i in config.devices:
device = DeviceListener(api_helper, config, i, debug) device = DeviceListener(api_helper, config, i, debug, web_session)
devices.append(device) devices.append(device)
tasks.append(loop.create_task(device.loop())) tasks.append(loop.create_task(device.loop()))
tasks.append(loop.create_task(device.refresh_auth_loop())) tasks.append(loop.create_task(device.refresh_auth_loop()))
@@ -165,3 +166,5 @@ def main(config, debug):
print("Cancelling tasks and exiting...") print("Cancelling tasks and exiting...")
loop.run_until_complete(finish(devices)) loop.run_until_complete(finish(devices))
loop.run_until_complete(web_session.close()) loop.run_until_complete(web_session.close())
loop.run_until_complete(tcp_connector.close())
loop.close()

View File

@@ -234,7 +234,7 @@ class AddDevice(ModalWithClickExit):
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
web_session = aiohttp.ClientSession() self.web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, web_session) self.api_helper = api_helpers.ApiHelper(config, web_session)
self.devices_discovered_dial = [] self.devices_discovered_dial = []
@@ -336,7 +336,7 @@ class AddDevice(ModalWithClickExit):
@on(Button.Pressed, "#add-device-pin-add-button") @on(Button.Pressed, "#add-device-pin-add-button")
async def handle_add_device_pin(self) -> None: async def handle_add_device_pin(self) -> None:
self.query_one("#add-device-pin-add-button").disabled = True self.query_one("#add-device-pin-add-button").disabled = True
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV", web_session=self.web_session)
pairing_code = self.query_one("#pairing-code-input").value pairing_code = self.query_one("#pairing-code-input").value
pairing_code = int( pairing_code = int(
pairing_code.replace("-", "").replace(" ", "") pairing_code.replace("-", "").replace(" ", "")

View File

@@ -1,3 +1,4 @@
from aiohttp import ClientSession
import asyncio import asyncio
import json import json
@@ -9,8 +10,13 @@ create_task = asyncio.create_task
class YtLoungeApi(pyytlounge.YtLoungeApi): class YtLoungeApi(pyytlounge.YtLoungeApi):
def __init__(self, screen_id, config=None, api_helper=None, logger=None): def __init__(self, screen_id, config=None, api_helper=None, logger=None,
web_session: ClientSession = None):
super().__init__("iSponsorBlockTV", logger=logger) super().__init__("iSponsorBlockTV", logger=logger)
if web_session is not None:
asyncio.get_event_loop().run_until_complete(
self.close()) # Close the default connection
self.session = web_session # And use the one we passed
self.auth.screen_id = screen_id self.auth.screen_id = screen_id
self.auth.lounge_id_token = None self.auth.lounge_id_token = None
self.api_helper = api_helper self.api_helper = api_helper
@@ -75,13 +81,13 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.logger.info("Ad has ended, unmuting") self.logger.info("Ad has ended, unmuting")
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif ( elif (
self.skip_ads and data["isSkipEnabled"] == "true" self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans ): # YouTube uses strings for booleans
self.logger.info("Ad can be skipped, skipping") self.logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad()) create_task(self.skip_ad())
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif ( elif (
self.mute_ads self.mute_ads
): # Seen multiple other adStates, assuming they are all ads ): # Seen multiple other adStates, assuming they are all ads
self.logger.info("Ad has started, muting") self.logger.info("Ad has started, muting")
create_task(self.mute(True, override=True)) create_task(self.mute(True, override=True))
@@ -92,7 +98,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
# Gets segments for the next video before it starts playing # Gets segments for the next video before it starts playing
elif event_type == "autoplayUpNext": elif event_type == "autoplayUpNext":
if len(args) > 0 and ( if len(args) > 0 and (
vid_id := args[0]["videoId"] vid_id := args[0]["videoId"]
): # if video id is not empty ): # if video id is not empty
self.logger.info(f"Getting segments for next video: {vid_id}") self.logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id)) create_task(self.api_helper.get_segments(vid_id))
@@ -105,13 +111,13 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.logger.info(f"Getting segments for next video: {vid_id}") self.logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id)) create_task(self.api_helper.get_segments(vid_id))
elif ( elif (
self.skip_ads and data["isSkipEnabled"] == "true" self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans ): # YouTube uses strings for booleans
self.logger.info("Ad can be skipped, skipping") self.logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad()) create_task(self.skip_ad())
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif ( elif (
self.mute_ads self.mute_ads
): # Seen multiple other adStates, assuming they are all ads ): # Seen multiple other adStates, assuming they are all ads
self.logger.info("Ad has started, muting") self.logger.info("Ad has started, muting")
create_task(self.mute(True, override=True)) create_task(self.mute(True, override=True))
@@ -122,7 +128,8 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
for device in devices: for device in devices:
if device["type"] == "LOUNGE_SCREEN": if device["type"] == "LOUNGE_SCREEN":
device_info = json.loads(device.get("deviceInfo", "{}")) device_info = json.loads(device.get("deviceInfo", "{}"))
if device_info.get("clientName", "") in youtube_client_blacklist: if device_info.get("clientName",
"") in youtube_client_blacklist:
self._sid = None self._sid = None
self._gsession = None # Force disconnect self._gsession = None # Force disconnect
@@ -134,7 +141,8 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
create_task(self.play_video(video_id_saved)) create_task(self.play_video(video_id_saved))
elif event_type == "loungeScreenDisconnected": elif event_type == "loungeScreenDisconnected":
data = args[0] data = args[0]
if data["reason"] == "disconnectedByUserScreenInitiated": # Short playing? if data[
"reason"] == "disconnectedByUserScreenInitiated": # Short playing?
self.shorts_disconnected = True self.shorts_disconnected = True
super()._process_event(event_id, event_type, args) super()._process_event(event_id, event_type, args)
@@ -152,17 +160,20 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
mute_str = "true" mute_str = "true"
else: else:
mute_str = "false" mute_str = "false"
if override or not (self.volume_state.get("muted", "false") == mute_str): if override or not (
self.volume_state.get("muted", "false") == mute_str):
self.volume_state["muted"] = mute_str self.volume_state["muted"] = mute_str
# YouTube wants the volume when unmuting, so we send it # YouTube wants the volume when unmuting, so we send it
await super()._command( await super()._command(
"setVolume", "setVolume",
{"volume": self.volume_state.get("volume", 100), "muted": mute_str}, {"volume": self.volume_state.get("volume", 100),
"muted": mute_str},
) )
async def set_auto_play_mode(self, enabled: bool): async def set_auto_play_mode(self, enabled: bool):
await super()._command( await super()._command(
"setAutoplayMode", {"autoplayMode": "ENABLED" if enabled else "DISABLED"} "setAutoplayMode",
{"autoplayMode": "ENABLED" if enabled else "DISABLED"}
) )
async def play_video(self, video_id: str) -> bool: async def play_video(self, video_id: str) -> bool: