diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8527753..68db9e1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,17 +1,15 @@ -# This is a basic workflow to help you get started with Actions - name: build docker images # Controls when the workflow will run on: push: branches: - - 'main' + - '*' tags: - 'v*' pull_request: branches: - - 'main' + - '*' workflow_dispatch: @@ -19,52 +17,53 @@ permissions: contents: read packages: write -# A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: build: runs-on: ubuntu-latest steps: # Get the repository's code - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 # Generate docker tags - name: Docker meta id: meta - uses: docker/metadata-action@v3 + uses: docker/metadata-action@v4 with: images: ghcr.io/dmunozv04/isponsorblocktv, dmunozv04/isponsorblocktv - tags: | - type=raw,value=latest,priority=900,enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }} - type=ref,enable=true,priority=600,prefix=pr-,suffix=,event=pr +# tags: | +# type=raw,value=latest,priority=900,enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }} +# type=ref,enable=true,priority=600,prefix=pr-,suffix=,event=pr # https://github.com/docker/setup-qemu-action - name: Set up QEMU - uses: docker/setup-qemu-action@v1 + uses: docker/setup-qemu-action@v2 # https://github.com/docker/setup-buildx-action - name: Set up Docker Buildx id: buildx - uses: docker/setup-buildx-action@v1 + uses: docker/setup-buildx-action@v2 - name: Login to DockerHub - uses: docker/login-action@v1 + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Login to GHCR - uses: docker/login-action@v1 + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v4 with: context: . - platforms: linux/amd64, linux/arm64 - push: true + platforms: linux/amd64, linux/arm64, linux/arm/v7 + push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache diff --git a/.gitignore b/.gitignore index 75c1fae..a9ada3d 100644 --- a/.gitignore +++ b/.gitignore @@ -154,7 +154,9 @@ cython_debug/ #.idea/ #config folder -config/ +data/ config.json +.DS_Store + .DS_Store \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 757ee13..653475f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,19 +1,13 @@ # syntax=docker/dockerfile:1 -FROM python:alpine +FROM python:alpine3.11 -RUN python -m venv /opt/venv - -ENV PATH="/opt/venv/bin:$PATH" PIP_NO_CACHE_DIR=off iSPBTV_docker=True +ENV PIP_NO_CACHE_DIR=off iSPBTV_docker=True TERM=xterm-256color COLORTERM=truecolor COPY requirements.txt . -RUN apk add gcc musl-dev build-base linux-headers libffi-dev rust cargo openssl-dev git avahi && \ - pip install --upgrade pip setuptools-rust wheel && \ - pip install -r requirements.txt && \ - apk del gcc musl-dev build-base linux-headers libffi-dev rust cargo openssl-dev git && \ - rm -rf /root/.cache /root/.cargo - +RUN pip install --upgrade pip wheel && \ + pip install -r requirements.txt COPY requirements.txt . @@ -21,4 +15,4 @@ WORKDIR /app COPY . . -ENTRYPOINT ["/opt/venv/bin/python3", "-u", "main.py"] +ENTRYPOINT ["python3", "-u", "main.py"] \ No newline at end of file diff --git a/config.json.template b/config.json.template index f5d303c..4dd4479 100644 --- a/config.json.template +++ b/config.json.template @@ -1,8 +1,21 @@ { - "atvs": [ - {"identifier": "", "airplay_credentials": ""} + "devices": [ + { + "screen_id": "", + "name": "YouTube on TV", + "offset": 0 + } ], - "apikey":"", - "skip_categories": ["sponsor"], - "channel_whitelist": [] + "skip_categories": [ + "sponsor" + ], + "skip_count_tracking": true, + "mute_ads": true, + "skip_ads": true, + "apikey": "", + "channel_whitelist": [ + {"id": "", + "name": "" + } + ] } \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5632a0b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: '3.3' +services: + iSponsorBlockTV: + image: ghcr.io/dmunozv04/isponsorblocktv + container_name: iSponsorBlockTV + restart: unless-stopped + volumes: + - /PATH_TO_YOUR_DATA_DIR:/app/data \ No newline at end of file diff --git a/iSponsorBlockTV/api_helpers.py b/iSponsorBlockTV/api_helpers.py index 81d06ce..2afc20c 100644 --- a/iSponsorBlockTV/api_helpers.py +++ b/iSponsorBlockTV/api_helpers.py @@ -1,7 +1,9 @@ from cache import AsyncTTL, AsyncLRU -from . import constants +from .conditional_ttl_cache import AsyncConditionalTTL +from . import constants, dial_client from hashlib import sha256 from asyncio import create_task +from aiohttp import ClientSession import html @@ -15,107 +17,144 @@ def listToTuple(function): return wrapper -@AsyncLRU(maxsize=10) -async def get_vid_id(title, artist, api_key, web_session): - params = {"q": title + " " + artist, "key": api_key, "part": "snippet"} - url = constants.Youtube_api + "search" - async with web_session.get(url, params=params) as resp: - data = await resp.json() - - if "error" in data: +# Class that handles all the api calls and their cache +class ApiHelper: + def __init__(self, config, web_session: ClientSession) -> None: + self.apikey = config.apikey + self.skip_categories = config.skip_categories + self.channel_whitelist = config.channel_whitelist + self.skip_count_tracking = config.skip_count_tracking + self.web_session = web_session + self.num_devices = len(config.devices) + + # Not used anymore, maybe it can stay here a little longer + @AsyncLRU(maxsize=10) + async def get_vid_id(self, title, artist, api_key, web_session): + params = {"q": title + " " + artist, "key": api_key, "part": "snippet"} + url = constants.Youtube_api + "search" + async with web_session.get(url, params=params) as resp: + data = await resp.json() + + if "error" in data: + return + + for i in data["items"]: + if (i["id"]["kind"] != "youtube#video"): + continue + title_api = html.unescape(i["snippet"]["title"]) + artist_api = html.unescape(i["snippet"]["channelTitle"]) + if title_api == title and artist_api == artist: + return (i["id"]["videoId"], i["snippet"]["channelId"]) return - for i in data["items"]: - if (i["id"]["kind"] != "youtube#video"): - continue - title_api = html.unescape(i["snippet"]["title"]) - artist_api = html.unescape(i["snippet"]["channelTitle"]) - if title_api == title and artist_api == artist: - return (i["id"]["videoId"], i["snippet"]["channelId"]) - return + @AsyncLRU(maxsize=100) + async def is_whitelisted(self, vid_id): + if (self.apikey and self.channel_whitelist): + channel_id = await self.__get_channel_id(vid_id) + # check if channel id is in whitelist + for i in self.channel_whitelist: + if i["id"] == channel_id: + return True + return False -@AsyncLRU(maxsize=10) -async def search_channels(channel, api_key, web_session): - channels = [] - params = {"q": channel, "key": api_key, "part": "snippet", "type": "channel", "maxResults": "5"} - url = constants.Youtube_api + "search" - async with web_session.get(url, params=params) as resp: - data = await resp.json() + async def __get_channel_id(self, vid_id): + params = {"id": vid_id, "key": self.apikey, "part": "snippet"} + url = constants.Youtube_api + "videos" + async with self.web_session.get(url, params=params) as resp: + data = await resp.json() - if "error" in data: + if "error" in data: + return + data = data["items"][0] + if (data["kind"] != "youtube#video"): + return + return data["snippet"]["channelId"] + + @AsyncLRU(maxsize=10) + async def search_channels(self, channel): + channels = [] + params = {"q": channel, "key": self.apikey, "part": "snippet", "type": "channel", "maxResults": "5"} + url = constants.Youtube_api + "search" + async with self.web_session.get(url, params=params) as resp: + data = await resp.json() + if "error" in data: + return channels + + for i in data["items"]: + # Get channel subcription number + params = {"id": i["snippet"]["channelId"], "key": self.apikey, "part": "statistics"} + url = constants.Youtube_api + "channels" + async with self.web_session.get(url, params=params) as resp: + channelData = await resp.json() + + if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]: + subCount = "Hidden" + else: + subCount = int(channelData["items"][0]["statistics"]["subscriberCount"]) + subCount = format(subCount, "_") + + channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount)) return channels - for i in data["items"]: - # Get channel subcription number - params = {"id": i["snippet"]["channelId"], "key": api_key, "part": "statistics"} - url = constants.Youtube_api + "channels" - async with web_session.get(url, params=params) as resp: - channelData = await resp.json() + @listToTuple # Convert list to tuple so it can be used as a key in the cache + @AsyncConditionalTTL(time_to_live=300, maxsize=10) # 5 minutes for non-locked segments + async def get_segments(self, vid_id): + if await self.is_whitelisted(vid_id): + print("Video is whitelisted") + return ([], True) # Return empty list and True to indicate that the cache should last forever + vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[ + :4 + ] # Hashes video id and gets the first 4 characters + params = { + "category": self.skip_categories, + "actionType": constants.SponsorBlock_actiontype, + "service": constants.SponsorBlock_service, + } + headers = {"Accept": "application/json"} + url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed + async with self.web_session.get(url, headers=headers, params=params) as response: + response = await response.json() + for i in response: + if str(i["videoID"]) == str(vid_id): + response = i + break + segments = [] + ignore_ttl = True + try: + for i in response["segments"]: + ignore_ttl = ignore_ttl and i["locked"] == 1 # If all segments are locked, ignore ttl + segment = i["segment"] + UUID = i["UUID"] + segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]} + try: + # Get segment before to check if they are too close to each other + segment_before_end = segments[-1]["end"] + segment_before_start = segments[-1]["start"] + segment_before_UUID = segments[-1]["UUID"] - if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]: - subCount = "Hidden" - else: - subCount = channelData["items"][0]["statistics"]["subscriberCount"] + except Exception: + segment_before_end = -10 + if ( + segment_dict["start"] - segment_before_end < 1 + ): # Less than 1 second appart, combine them and skip them together + segment_dict["start"] = segment_before_start + segment_dict["UUID"].append(segment_before_UUID) + segments.pop() + segments.append(segment_dict) + except Exception: + pass + return (segments, ignore_ttl) - channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount)) + async def mark_viewed_segments(self, UUID): + """Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. Lets the contributor know that someone skipped the segment (thanks)""" + if self.skip_count_tracking: + for i in UUID: + url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" + params = {"UUID": i} + await self.web_session.post(url, params=params) - return channels - -@listToTuple -@AsyncTTL(time_to_live=300, maxsize=5) -async def get_segments(vid_id, web_session, categories=["sponsor"]): - vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[ - :4 - ] # Hashes video id and get the first 4 characters - params = { - "category": categories, - "actionType": constants.SponsorBlock_actiontype, - "service": constants.SponsorBlock_service, - } - headers = {"Accept": "application/json"} - url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed - async with web_session.get(url, headers=headers, params=params) as response: - response = await response.json() - for i in response: - if str(i["videoID"]) == str(vid_id): - response = i - break - segments = [] - try: - for i in response["segments"]: - segment = i["segment"] - UUID = i["UUID"] - segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]} - try: - # Get segment before to check if they are too close to each other - segment_before_end = segments[-1]["end"] - segment_before_start = segments[-1]["start"] - segment_before_UUID = segments[-1]["UUID"] - - except: - segment_before_end = -10 - if ( - segment_dict["start"] - segment_before_end < 1 - ): # Less than 1 second appart, combine them and skip them together - segment_dict["start"] = segment_before_start - segment_dict["UUID"].append(segment_before_UUID) - segments.pop() - segments.append(segment_dict) - except: - pass - return segments - - -async def viewed_segments(UUID, web_session): - url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" - for i in UUID: - create_task(mark_viewed_segment(i, web_session)) - return - - -async def mark_viewed_segment(UUID, web_session): - url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" - params = {"UUID": UUID} - async with web_session.post(url, params=params) as response: - response_text = await response.text() - return + async def discover_youtube_devices_dial(self): + """Discovers YouTube devices using DIAL""" + dial_screens = await dial_client.discover(self.web_session) + # print(dial_screens) + return dial_screens diff --git a/iSponsorBlockTV/conditional_ttl_cache.py b/iSponsorBlockTV/conditional_ttl_cache.py new file mode 100644 index 0000000..49fd0e7 --- /dev/null +++ b/iSponsorBlockTV/conditional_ttl_cache.py @@ -0,0 +1,87 @@ +"""MIT License + +Copyright (c) 2020 Rajat Singh + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.""" +'''Modified code from https://github.com/iamsinghrajat/async-cache''' + +from cache.key import KEY +from cache.lru import LRU +import datetime + + +class AsyncConditionalTTL: + class _TTL(LRU): + def __init__(self, time_to_live, maxsize): + super().__init__(maxsize=maxsize) + + self.time_to_live = datetime.timedelta( + seconds=time_to_live + ) if time_to_live else None + + self.maxsize = maxsize + + def __contains__(self, key): + if key not in self.keys(): + return False + else: + key_expiration = super().__getitem__(key)[1] + if key_expiration and key_expiration < datetime.datetime.now(): + del self[key] + return False + else: + return True + + def __getitem__(self, key): + value = super().__getitem__(key)[0] + return value + + def __setitem__(self, key, value): + value, ignore_ttl = value # unpack tuple + ttl_value = ( + datetime.datetime.now() + self.time_to_live + ) if (self.time_to_live and not ignore_ttl) else None # ignore ttl if ignore_ttl is True + super().__setitem__(key, (value, ttl_value)) + + def __init__( + self, time_to_live=60, maxsize=1024, skip_args: int = 0 + ): + """ + + :param time_to_live: Use time_to_live as None for non expiring cache + :param maxsize: Use maxsize as None for unlimited size cache + :param skip_args: Use `1` to skip first arg of func in determining cache key + """ + self.ttl = self._TTL(time_to_live=time_to_live, maxsize=maxsize) + self.skip_args = skip_args + + def __call__(self, func): + async def wrapper(*args, **kwargs): + key = KEY(args[self.skip_args:], kwargs) + if key in self.ttl: + val = self.ttl[key] + else: + self.ttl[key] = await func(*args, **kwargs) + val = self.ttl[key] + + return val + + wrapper.__name__ += func.__name__ + + return wrapper diff --git a/iSponsorBlockTV/config_setup.py b/iSponsorBlockTV/config_setup.py index 7aed0cb..f9ae79d 100644 --- a/iSponsorBlockTV/config_setup.py +++ b/iSponsorBlockTV/config_setup.py @@ -1,107 +1,67 @@ -import pyatv import json import asyncio -from pyatv.const import OperatingSystem import sys import aiohttp -import asyncio -from . import api_helpers +from . import api_helpers, ytlounge -def save_config(config, config_file): - with open(config_file, "w") as f: - json.dump(config, f) - - -# Taken from postlund/pyatv atvremote.py -async def _read_input(loop: asyncio.AbstractEventLoop, prompt: str): - sys.stdout.write(prompt) - sys.stdout.flush() - user_input = await loop.run_in_executor(None, sys.stdin.readline) - return user_input.strip() - - -async def find_atvs(loop): - devices = await pyatv.scan(loop) - if not devices: - print("No devices found") +async def pair_device(loop): + try: + lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") + pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ") + pairing_code = int(pairing_code.replace("-", "").replace(" ","")) # remove dashes and spaces + print("Pairing...") + paired = await lounge_controller.pair(pairing_code) + if not paired: + print("Failed to pair device") + return + device = { + "screen_id": lounge_controller.auth.screen_id, + "name": lounge_controller.screen_name, + } + print(f"Paired device: {device['name']}") + return device + except Exception as e: + print(f"Failed to pair device: {e}") return - atvs = [] - for i in devices: - # Only get Apple TV's - if ( - i.device_info.operating_system == OperatingSystem.TvOS - and input(f"Found {i.name}. Do you want to add it? (y/n) ") == "y" - ): - identifier = i.identifier - - pairing = await pyatv.pair( - i, loop=loop, protocol=pyatv.Protocol.AirPlay - ) - await pairing.begin() - if pairing.device_provides_pin: - pin = await _read_input(loop, "Enter PIN on screen: ") - pairing.pin(pin) - - await pairing.finish() - if pairing.has_paired: - creds = pairing.service.credentials - atvs.append( - {"identifier": identifier, "airplay_credentials": creds} - ) - print("Pairing successful") - await pairing.close() - return atvs - - -def main(config, config_file, debug): + +def main(config, debug: bool) -> None: + print("Welcome to the iSponsorBlockTV cli setup wizard") loop = asyncio.get_event_loop_policy().get_event_loop() - try: - num_atvs = len(config["atvs"]) - except: - num_atvs = 0 - if ( - input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs)) - == "y" - ): - loop = asyncio.get_event_loop_policy().get_event_loop() - if debug: - loop.set_debug(True) - asyncio.set_event_loop(loop) - task = loop.create_task(find_atvs(loop)) + if debug: + loop.set_debug(True) + asyncio.set_event_loop(loop) + if hasattr(config, "atvs"): + print("The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2") + if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y": + del config["atvs"] + devices = config.devices + while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n": + task = loop.create_task(pair_device(loop)) loop.run_until_complete(task) - atvs = task.result() - try: - for i in atvs: - config["atvs"].append(i) - print("Done adding") - except: - print("Rewriting atvs (don't worry if none were saved before)") - config["atvs"] = atvs + device = task.result() + if device: + devices.append(device) + config.devices = devices - try: - apikey = config["apikey"] - except: - apikey = "" - if apikey != "": + apikey = config.apikey + if apikey: if input("API key already specified. Change it? (y/n) ") == "y": apikey = input("Enter your API key: ") config["apikey"] = apikey else: - print( - "Get youtube apikey here: https://developers.google.com/youtube/registering_an_application" - ) - apikey = input("Enter your API key: ") - config["apikey"] = apikey - - try: - skip_categories = config["skip_categories"] - except: - skip_categories = [] - - if skip_categories != []: + if input("API key only needed for the channel whitelist function. Add it? (y/n) ") == "y": + print( + "Get youtube apikey here: https://developers.google.com/youtube/registering_an_application" + ) + apikey = input("Enter your API key: ") + config["apikey"] = apikey + config.apikey = apikey + + skip_categories = config.skip_categories + if skip_categories: if input("Skip categories already specified. Change them? (y/n) ") == "y": categories = input( - "Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic:\n" + "Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic]:\n" ) skip_categories = categories.replace(",", " ").split(" ") skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings @@ -111,14 +71,12 @@ def main(config, config_file, debug): ) skip_categories = categories.replace(",", " ").split(" ") skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings - config["skip_categories"] = skip_categories - - try: - channel_whitelist = config["channel_whitelist"] - except: - channel_whitelist = [] + config.skip_categories = skip_categories + channel_whitelist = config.channel_whitelist if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y": + if(not apikey): + print("WARNING: You need to specify an API key to use this function, otherwise the program will fail to start.\nYou can add one by re-running this setup wizard.") web_session = aiohttp.ClientSession() while True: channel_info = {} @@ -158,7 +116,8 @@ def main(config, config_file, debug): # Close web session asynchronously loop.run_until_complete(web_session.close()) - config["channel_whitelist"] = channel_whitelist - + config.channel_whitelist = channel_whitelist + + config.skip_count_tracking = not input("Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n" print("Config finished") - save_config(config, config_file) + config.save() diff --git a/iSponsorBlockTV/constants.py b/iSponsorBlockTV/constants.py index ad07205..1f3f75f 100644 --- a/iSponsorBlockTV/constants.py +++ b/iSponsorBlockTV/constants.py @@ -4,3 +4,16 @@ SponsorBlock_actiontype = "skip" SponsorBlock_api = "https://sponsor.ajay.app/api/" Youtube_api = "https://www.googleapis.com/youtube/v3/" + +skip_categories = ( + ('Sponsor', 'sponsor'), + ('Self Promotion', 'selfpromo'), + ('Intro', 'intro'), + ('Outro', 'outro'), + ('Music Offtopic', 'music_offtopic'), + ('Interaction', 'interaction'), + ('Exclusive Access', 'exclusive_access'), + ('POI Highlight', 'poi_highlight'), + ('Preview', 'preview'), + ('Filler', 'filler'), +) diff --git a/iSponsorBlockTV/dial_client.py b/iSponsorBlockTV/dial_client.py new file mode 100644 index 0000000..aa553cd --- /dev/null +++ b/iSponsorBlockTV/dial_client.py @@ -0,0 +1,142 @@ +"""Send out a M-SEARCH request and listening for responses.""" +import asyncio +import socket + +import aiohttp +import ssdp +from ssdp import network +import xmltodict + +''' +Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), with or without modification, +are permitted provided that the following conditions are met: +● Redistributions of the DIAL Specification must retain the above copyright notice, this list of conditions and the following +disclaimer. +● Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright notice, this +list of conditions and the following disclaimer. +● Redistributions of implementations of the DIAL Specification in binary form must include the above copyright notice. +● The DIAL mark, the NETFLIX mark and the names of contributors to the DIAL Specification may not be used to endorse or +promote specifications, software, products, or any other materials derived from the DIAL Specification without specific prior +written permission. The DIAL mark is owned by Netflix and information on licensing the DIAL mark is available at +www.dial-multiscreen.org.''' + +''' +MIT License + +Copyright (c) 2018 Johannes Hoppe + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.''' +'''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py''' + +def get_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + try: + # doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + IP = s.getsockname()[0] + except Exception: + IP = '127.0.0.1' + finally: + s.close() + return IP + + +class Handler(ssdp.aio.SSDP): + + def __init__(self): + super().__init__() + self.devices = [] + + def clear(self): + self.devices = [] + + def __call__(self): + return self + def response_received(self, response: ssdp.messages.SSDPResponse, addr): + headers = response.headers + headers = {k.lower(): v for k, v in headers} + # print(headers) + if "location" in headers: + self.devices.append(headers["location"]) + + +async def find_youtube_app(web_session, url_location): + async with web_session.get(url_location) as response: + headers = response.headers + response = await response.text() + # print(headers) + + data = xmltodict.parse(response) + name = data["root"]["device"]["friendlyName"] + handler = Handler() + handler.clear() + app_url = headers["application-url"] + youtube_url = app_url + "YouTube" + # print(youtube_url) + async with web_session.get(youtube_url) as response: + status_code = response.status + response = await response.text() + # print(status_code) + if status_code == 200: + data = xmltodict.parse(response) + data = data["service"] + screen_id = data["additionalData"]["screenId"] + return {"screen_id": screen_id, "name": name, "offset": 0} + + +async def discover(web_session): + bind = None + search_target = "urn:dial-multiscreen-org:service:dial:1" + max_wait = 10 + handler = Handler() + """Send out an M-SEARCH request and listening for responses.""" + family, addr = network.get_best_family(bind, network.PORT) + loop = asyncio.get_event_loop() + ip_address = get_ip() + connect = loop.create_datagram_endpoint(handler, family=family, local_addr=(ip_address, None)) + transport, protocol = await connect + + target = network.MULTICAST_ADDRESS_IPV4, network.PORT + + search_request = ssdp.messages.SSDPRequest( + "M-SEARCH", + headers={ + "HOST": "%s:%d" % target, + "MAN": '"ssdp:discover"', + "MX": str(max_wait), # seconds to delay response [1..5] + "ST": search_target, + }, + ) + + target = network.MULTICAST_ADDRESS_IPV4, network.PORT + + search_request.sendto(transport, target) + + # print(search_request, addr[:2]) + try: + await asyncio.sleep(4) + finally: + transport.close() + + devices = [] + for i in handler.devices: + devices.append(await find_youtube_app(web_session, i)) + + return devices diff --git a/iSponsorBlockTV/helpers.py b/iSponsorBlockTV/helpers.py index daf6cd8..392b333 100644 --- a/iSponsorBlockTV/helpers.py +++ b/iSponsorBlockTV/helpers.py @@ -1,55 +1,118 @@ import argparse -from . import config_setup -from . import main -from . import macos_install import json -import os import logging +import os import sys +import time -def load_config(config_file): - if os.path.exists(config_file): - try: - with open(config_file, "r") as f: - config = json.load(f) - except: - print("Creating config file") - config = {} - else: - if os.getenv("iSPBTV_docker"): +from . import config_setup, main, setup_wizard + + +class Device: + def __init__(self, args_dict): + self.screen_id = "" + self.offset = 0 + self.__load(args_dict) + self.__validate() + + def __load(self, args_dict): + for i in args_dict: + setattr(self, i, args_dict[i]) + # Change offset to seconds (from milliseconds) + self.offset = self.offset / 1000 + + def __validate(self): + if not self.screen_id: + raise ValueError("No screen id found") + + +class Config: + def __init__(self, data_dir): + self.data_dir = data_dir + self.config_file = data_dir + "/config.json" + + self.devices = [] + self.apikey = "" + self.skip_categories = [] + self.channel_whitelist = [] + self.skip_count_tracking = True + self.mute_ads = False + self.skip_ads = False + self.__load() + + def validate(self): + if hasattr(self, "atvs"): print( - "You are running in docker, you have to mount the config file.\nPlease check the README.md for more information." + "The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2", ) + print("Exiting in 10 seconds...") + time.sleep(10) sys.exit() - return - else: - print("Creating config file") - config = {} # Create blank config to setup - return config + if not self.devices: + print("No devices found, please add at least one device") + print("Exiting in 10 seconds...") + time.sleep(10) + sys.exit() + self.devices = [Device(i) for i in self.devices] + if not self.apikey and self.channel_whitelist: + raise ValueError("No youtube API key found and channel whitelist is not empty") + if not self.skip_categories: + self.categories = ["sponsor"] + print("No categories found, using default: sponsor") + + def __load(self): + try: + with open(self.config_file, "r") as f: + config = json.load(f) + for i in config: + setattr(self, i, config[i]) + except FileNotFoundError: + print("Could not load config file") + # Create data directory if it doesn't exist (if we're not running in docker) + if not os.path.exists(self.data_dir): + if not os.getenv("iSPBTV_docker"): + print("Creating data directory") + os.makedirs(self.data_dir) + else: # Running in docker without mounting the data dir + print("Running in docker without mounting the data dir, check the wiki for more information: " + "https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker") + print("Exiting in 10 seconds...") + time.sleep(10) + sys.exit() + else: + print("Blank config file created") + + def save(self): + with open(self.config_file, "w") as f: + config_dict = self.__dict__ + # Don't save the config file name + config_file = self.config_file + del config_dict["config_file"] + json.dump(config_dict, f, indent=4) + self.config_file = config_file + + def __eq__(self, other): + if isinstance(other, Config): + return self.__dict__ == other.__dict__ + return False def app_start(): parser = argparse.ArgumentParser(description="iSponsorblockTV") - parser.add_argument("--file", "-f", default="config.json", help="config file") - parser.add_argument("--setup", "-s", action="store_true", help="setup the program") - parser.add_argument("--debug", "-d", action="store_true", help="debug mode") - parser.add_argument("--macos_install", action="store_true", help="install in macOS") + parser.add_argument("--data-dir", "-d", default="data", help="data directory") + parser.add_argument("--setup", "-s", action="store_true", help="setup the program graphically") + parser.add_argument("--setup-cli", "-sc", action="store_true", help="setup the program in the command line") + parser.add_argument("--debug", action="store_true", help="debug mode") args = parser.parse_args() - config = load_config(args.file) + config = Config(args.data_dir) if args.debug: logging.basicConfig(level=logging.DEBUG) - if args.setup: # Setup the config file - config_setup.main(config, args.file, args.debug) - elif args.macos_install: - macos_install.main() - + if args.setup: # Set up the config file graphically + setup_wizard.main(config) + sys.exit() + if args.setup_cli: # Set up the config file + config_setup.main(config, args.debug) else: - try: # Check if config file has the correct structure - config["atvs"], config["apikey"], config["skip_categories"], config["channel_whitelist"] - except: # If not, ask to setup the program - print("invalid config file, please run with --setup") - sys.exit() - main.main( - config["atvs"], config["apikey"], config["skip_categories"], config["channel_whitelist"], args.debug - ) + config.validate() + main.main(config, args.debug) diff --git a/iSponsorBlockTV/macos_install.py b/iSponsorBlockTV/macos_install.py index 6429368..04be8be 100644 --- a/iSponsorBlockTV/macos_install.py +++ b/iSponsorBlockTV/macos_install.py @@ -1,7 +1,7 @@ import plistlib import os from . import config_setup - +"""Not updated to V2 yet, should still work. Here be dragons""" default_plist = { "Label": "com.dmunozv04iSponsorBlockTV", "RunAtLoad": True, diff --git a/iSponsorBlockTV/main.py b/iSponsorBlockTV/main.py index 439716c..fd68b42 100644 --- a/iSponsorBlockTV/main.py +++ b/iSponsorBlockTV/main.py @@ -1,148 +1,147 @@ import asyncio -import pyatv import aiohttp import time import logging -from . import api_helpers +from . import api_helpers, ytlounge +import traceback -class MyPushListener(pyatv.interface.PushListener): - task = None - apikey = None - rc = None +class DeviceListener: + def __init__(self, api_helper, config, screen_id, offset): + self.api_helper = api_helper + self.lounge_controller = ytlounge.YtLoungeApi(screen_id, config, api_helper) + self.offset = offset + self.cancelled = False - web_session = None - categories = ["sponsor"] - whitelist = [] + # Ensures that we have a valid auth token + async def refresh_auth_loop(self): + while True: + await asyncio.sleep(60 * 60 * 24) # Refresh every 24 hours + try: + await self.lounge_controller.refresh_auth() + except: + # traceback.print_exc() + pass - def __init__(self, apikey, atv, categories, whitelist, web_session): - self.apikey = apikey - self.rc = atv.remote_control - self.web_session = web_session - self.categories = categories - self.whitelist = whitelist - self.atv = atv + async def is_available(self): + try: + return await self.lounge_controller.is_available() + except: + # traceback.print_exc() + return False - def playstatus_update(self, updater, playstatus): - logging.debug("Playstatus update" + str(playstatus)) + # Main subscription loop + async def loop(self): + lounge_controller = self.lounge_controller + while not lounge_controller.linked(): + try: + await lounge_controller.refresh_auth() + except: + # traceback.print_exc() + await asyncio.sleep(10) + + while not self.cancelled: + while not (await self.is_available()) and not self.cancelled: + await asyncio.sleep(10) + try: + await lounge_controller.connect() + except: + pass + while not lounge_controller.connected() and not self.cancelled: + await asyncio.sleep(10) + try: + await lounge_controller.connect() + except: + pass + # print(f"Connected to device {lounge_controller.screen_name}") + try: + print("Subscribing to lounge") + sub = await lounge_controller.subscribe_monitored(self) + await sub + print("Subscription ended") + except: + pass + + # Method called on playback state change + async def __call__(self, state): + logging.debug("Playstatus update" + str(state)) try: self.task.cancel() except: pass time_start = time.time() - self.task = asyncio.create_task( - process_playstatus( - playstatus, - self.apikey, - self.rc, - self.web_session, - self.categories, - self.atv, - time_start, - self.whitelist - ) - ) + self.task = asyncio.create_task(self.process_playstatus(state, time_start)) - def playstatus_error(self, updater, exception): - logging.error(exception) - print("stopped") - - -async def process_playstatus( - playstatus, apikey, rc, web_session, categories, atv, time_start, whitelist -): - logging.debug("App playing is:" + str(atv.metadata.app.identifier)) - if ( - playstatus.device_state == playstatus.device_state.Playing - and atv.metadata.app.identifier == "com.google.ios.youtube" - ): - vid_id = await api_helpers.get_vid_id( - playstatus.title, playstatus.artist, apikey, web_session - ) - if vid_id: - print(f"ID: {vid_id[0]}, Channel ID: {vid_id[1]}") - for i in whitelist: - if vid_id[1] == i["id"]: - print("Channel whitelisted, skipping.") - return - segments = await api_helpers.get_segments(vid_id[0], web_session, categories) + # Processes the playback state change + async def process_playstatus(self, state, time_start): + segments = [] + if state.videoId: + segments = await self.api_helper.get_segments(state.videoId) print(segments) - await time_to_segment( - segments, playstatus.position, rc, time_start, web_session - ) - else: - print("Could not find video id") + if state.state.value == 1 and segments: # Playing and has segments to skip + await self.time_to_segment(segments, state.currentTime, time_start) + # Finds the next segment to skip to and skips to it + async def time_to_segment(self, segments, position, time_start): + start_next_segment = None + next_segment = None + for segment in segments: + if position < 2 and ( + segment["start"] <= position < segment["end"] + ): + next_segment = segment + start_next_segment = position # different variable so segment doesn't change + break + if segment["start"] > position: + next_segment = segment + start_next_segment = next_segment["start"] + break + if start_next_segment: + time_to_next = start_next_segment - position - (time.time() - time_start) - self.offset + await self.skip(time_to_next, next_segment["end"], next_segment["UUID"]) -async def time_to_segment(segments, position, rc, time_start, web_session): - position = position + (time.time() - time_start) - for segment in segments: - if position < 2 and ( - position >= segment["start"] and position < segment["end"] - ): - next_segment = [position, segment["end"]] - break - if segment["start"] > position: - next_segment = segment - break - time_to_next = next_segment["start"] - position - await skip(time_to_next, next_segment["end"], next_segment["UUID"], rc, web_session) + # Skips to the next segment (waits for the time to pass) + async def skip(self, time_to, position, UUID): + await asyncio.sleep(time_to) + asyncio.create_task(self.lounge_controller.seek_to(position)) + asyncio.create_task( + self.api_helper.mark_viewed_segments(UUID) + ) # Don't wait for this to finish - -async def skip(time_to, position, UUID, rc, web_session): - await asyncio.sleep(time_to) - await rc.set_position(position) - # await api_helpers.viewed_segments(UUID, web_session) DISABLED FOR NOW - - -async def connect_atv(loop, identifier, airplay_credentials): - """Find a device and print what is playing.""" - print("Discovering devices on network...") - atvs = await pyatv.scan(loop, identifier=identifier) - - if not atvs: - print("No device found, will retry") - return - - config = atvs[0] - config.set_credentials(pyatv.Protocol.AirPlay, airplay_credentials) - - print(f"Connecting to {config.address}") - return await pyatv.connect(config, loop) - - -async def loop_atv(event_loop, atv_config, apikey, categories, whitelist, web_session): - identifier = atv_config["identifier"] - airplay_credentials = atv_config["airplay_credentials"] - atv = await connect_atv(event_loop, identifier, airplay_credentials) - if atv: - listener = MyPushListener(apikey, atv, categories, whitelist, web_session) - - atv.push_updater.listener = listener - atv.push_updater.start() - print("Push updater started") - while True: - await asyncio.sleep(20) + # Stops the connection to the device + async def cancel(self): + self.cancelled = True try: - atv.metadata.app - except: - print("Reconnecting to Apple TV") - # reconnect to apple tv - atv = await connect_atv(event_loop, identifier, airplay_credentials) - if atv: - listener = MyPushListener(apikey, atv, categories, whitelist, web_session) - - atv.push_updater.listener = listener - atv.push_updater.start() - print("Push updater started") + self.task.cancel() + except Exception as e: + traceback.print_exc() -def main(atv_configs, apikey, categories, whitelist, debug): +async def finish(devices): + for i in devices: + await i.cancel() + + +def main(config, debug): loop = asyncio.get_event_loop_policy().get_event_loop() + tasks = [] # Save the tasks so the interpreter doesn't garbage collect them + devices = [] # Save the devices to close them later if debug: loop.set_debug(True) asyncio.set_event_loop(loop) - web_session = aiohttp.ClientSession() - for i in atv_configs: - loop.create_task(loop_atv(loop, i, apikey, categories, whitelist, web_session)) - loop.run_forever() + tcp_connector = aiohttp.TCPConnector(ttl_dns_cache=300) + web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector) + api_helper = api_helpers.ApiHelper(config, web_session) + for i in config.devices: + device = DeviceListener(api_helper, config, i.screen_id, i.offset) + devices.append(device) + tasks.append(loop.create_task(device.loop())) + tasks.append(loop.create_task(device.refresh_auth_loop())) + try: + loop.run_forever() + except KeyboardInterrupt as e: + print("Keyboard interrupt detected, cancelling tasks and exiting...") + traceback.print_exc() + loop.run_until_complete(finish(devices)) + finally: + loop.run_until_complete(web_session.close()) diff --git a/iSponsorBlockTV/setup-wizard-style.tcss b/iSponsorBlockTV/setup-wizard-style.tcss new file mode 100644 index 0000000..503b359 --- /dev/null +++ b/iSponsorBlockTV/setup-wizard-style.tcss @@ -0,0 +1,365 @@ +.container { + background: $boost; + margin: 1; + padding: 1 1 0 1; + height: auto; + width: 100%; +} + +.title { + text-style: bold underline; + dock: top; + background: $primary-background; + width: 100%; + +} +.subtitle{ + width: 100%; +} + +#setup-wizard{ + scrollbar-gutter: stable; +} + +.small-button{ + height: 3; + +} + +.button-100 { + width: 100%; +} +/* Exit screen */ + +ExitScreen { + align: center middle; +} + +#dialog-exit { + grid-size: 3; + grid-gutter: 1 2; + grid-rows: 1fr 3; + padding: 1 2; + width: 35%; + min-width: 50; + max-width: 70; + height: 11; + border: thick $background 80%; + background: $surface; +} + +#question { + column-span: 3; + height: 1fr; + width: 1fr; + content-align: center middle; +} +/* Device editor */ +EditDevice { + align: center middle; +} +#edit-device-container { + padding: 1 2 0 2; + background: $surface; + border: thick $background 80%; + height: 17; + width: 50%; + min-width: 40; +} + +#device-id-container{ + grid-size: 4; + grid-gutter: 1 2; + grid-rows: 1fr 3; + width: 100%; + margin-right: 1; +} +#device-id-input{ + column-span: 3; + height: 1fr; + width: 1fr; + content-align: center middle; +} +#device-id-view{ + width: 1fr; +} +#device-offset-container{ + width: 100%; +} +#device-offset-input{ + width: 12; +} +#device-offset-slider{ + width: 100%; + margin-right: 12; +} + +/* devices */ +#devices-manager { + min-height: 4; + height: auto; + max-height: 70%; + width: 100%; + scrollbar-gutter: stable; + overflow-y: auto; + +} + +Element { + background: $panel; + border-top: solid $panel-lighten-2; + layout: horizontal; + height: 2; + width: 100%; + margin: 0 1 0 1; + padding: 0; +} +Element > .element-name { + offset: 0 -1; + padding: 0; + width: 100%; + align: left middle; + text-align: left; + +} +Element > .element-remove { + dock: right; + padding: 0; + align: left middle; + text-align: center; + width: 8; + min-width: 8; + margin: 0 1 0 0; +} + +#add-device { + text-style: bold; + width: 100%; + align: left middle; + text-align: center; + dock: left; + + text-align: left; +} +#add-device-button-container{ + height: 1; + width: 100%; + margin: 1 1 0 1; +} + + + +/* Add devices */ +#add-device-container{ + border: thick $background 80%; + background: $surface; + padding: 1 2 0 2; + height: auto; + width: 50%; + min-width: 40; +} +#add-device-switch-buttons{ + grid-size: 2 1; + height: 5; + width: 100%; + padding: 1 1; +} +#add-device-pin-container{ + height: auto; +} +#add-device-dial-container{ + height: auto; +} +#pairing-code-input.-valid { + border: tall $success 60%; +} + +#pairing-code-input.-valid:focus { + border: tall $success; +} + +#add-device-pin-add-button{ + margin: 1 1 0 1; + width: 100%; +} +#add-device-dial-add-button{ + margin: 1 1 0 1; + width: 100%; +} +#add-device-info{ + height: auto; + min-height: 0; + width: 100%; + padding: 0 1; +} + +/* ApiKey */ +#api-key-grid{ + grid-size: 4; + grid-gutter: 1 2; + grid-rows: 1fr 3; + padding: 1 1; + width: 100%; + height: 5; +} +#api-key-input{ + column-span: 3; + height: 1fr; + width: 1fr; + content-align: center middle; +} +/* Skip Categories */ +#skip-categories-manager{ + min-height: 10; + height: auto; + max-height: 70%; + width: 100%; + scrollbar-gutter: stable; +} +#skip-categories-selection-list{ + height: auto; + width: 100%; + background: $boost; + margin: 1; +} + +/* Segment count tracking */ +#skip-count-tracking-switch{ + margin: 1; +} + +/* Channel Whitelist */ + +#channel-whitelist-manager { + min-height: 5; + height: auto; + max-height: 70%; + width: 100%; + scrollbar-gutter: stable; + overflow-y: auto; +} + +#add-channel { + text-style: bold; + width: 100%; + align: left middle; + text-align: center; + dock: left; + + text-align: left; +} +#add-channel-button-container{ + height: 1; + width: 100%; + margin: 1 1 0 1; +} + + +/* Add Channel */ +AddChannel{ + align: center middle; + overflow-y: auto; + background: $background 60%; +} +#add-channel-container{ + padding: 1 2 0 2; + border: thick $background 80%; + background: $surface; + height: auto; + width: 50%; + min-width: 40; + +} +#add-channel-switch-buttons{ + grid-size: 2 1; + height: 5; + width: 100%; + padding: 1 1; +} +.button-switcher{ + width: 100%; + text-align: center; +} +#add-channel-switcher{ + height: auto; + width: auto; +} +#add-channel-switcher-container{ + height: auto; + width: 100%; +} +#add-channel-search-container{ + height: auto; +} +#add-channel-search-inputs{ + height: 3; + width: 100%; + grid-size: 4 1; + margin: 0 1 0 0; + /* padding: 0 1; */ +} +#channel-name-input-search{ + width: 100%; + height: auto; + column-span: 3; +} +#search-channel-button{ + width: 1fr; + height: auto; + column-span: 1; + margin: 0 0 0 0; +} +#channel-search-results{ + height: auto; + width: 100%; + background: $boost; + margin: 1; +} + +#add-channel-info{ + height: auto; + width: 100%; + margin: 0 1; +} +#add-channel-id-container{ + height: auto; +} +#channel-name-input-id{ + margin: 1 0; +} + +#add-channel-search-no-key{ + padding: 0 0 1 1; +} + +/* Mute/Skip ads */ +#ad-skip-mute-container{ + padding: 1; + height: auto; +} + +/* Migrate screen */ +MigrationScreen { + align: center middle; +} + + +#dialog-migration { + grid-size: 2; + grid-gutter: 1 2; + grid-rows: 1fr 3; + padding: 1 2; + width: 35%; + min-width: 50; + max-width: 70; + height: 11; + border: thick $background 80%; + background: $surface; +} + +#question-migrate { + column-span: 2; + height: 1fr; + width: 1fr; + content-align: center middle; +} \ No newline at end of file diff --git a/iSponsorBlockTV/setup_wizard.py b/iSponsorBlockTV/setup_wizard.py new file mode 100644 index 0000000..62ac2df --- /dev/null +++ b/iSponsorBlockTV/setup_wizard.py @@ -0,0 +1,701 @@ +import aiohttp +import asyncio +import copy +# Textual imports (Textual is awesome!) +from textual import on +from textual.app import App, ComposeResult +from textual.containers import ScrollableContainer, Grid, Container, Vertical, Horizontal +from textual.events import Click +from textual.screen import Screen +from textual.validation import Function +from textual.widgets import Button, Footer, Header, Static, Label, Input, SelectionList, Checkbox, ContentSwitcher, \ + RadioSet, RadioButton +from textual.widgets.selection_list import Selection +from textual_slider import Slider +# Local imports +from . import api_helpers, ytlounge +from .constants import skip_categories + + +def _validate_pairing_code(pairing_code: str) -> bool: + try: + pairing_code = pairing_code.replace("-", "").replace(" ", "") + int(pairing_code) + return len(pairing_code) == 12 + except ValueError: + return False # not a number + + +class ModalWithClickExit(Screen): + """A modal screen that exits when clicked outside its bounds. + https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604""" + DEFAULT_CSS = """ + ModalWithClickExit { + align: center middle; + layout: vertical; + overflow-y: auto; + background: $background 60%; + } + """ + + @on(Click) + def close_out_bounds(self, event: Click) -> None: + if self.get_widget_at(event.screen_x, event.screen_y)[0] is self: + self.dismiss() + + +class Element(Static): + """Base class for elements (devices and channels). + It has a name and a remove button. + """ + + def __init__(self, element: dict, tooltip: str = None, **kwargs) -> None: + super().__init__(**kwargs) + self.element_data = element + self.element_name = "" + self.process_values_from_data() + self.tooltip = tooltip + + def process_values_from_data(self): + pass + + def compose(self) -> ComposeResult: + yield Button(label=self.element_name, classes="element-name", disabled=True, id="element-name") + yield Button("Remove", classes="element-remove", variant="error", id="element-remove") + + def on_mount(self) -> None: + if self.tooltip: + self.query_one(".element-name").tooltip = self.tooltip + self.query_one(".element-name").disabled = False + + +class Device(Element): + """A device element.""" + + def process_values_from_data(self): + print("HIIII") + print(self.element_data) + if "name" in self.element_data and self.element_data["name"]: + self.element_name = self.element_data["name"] + else: + self.element_name = f"Unnamed device with id {self.element_data['screen_id'][:5]}...{self.element_data['screen_id'][-5:]}" + + +class Channel(Element): + """A channel element.""" + + def process_values_from_data(self): + if "name" in self.element_data: + self.element_name = self.element_data["name"] + else: + self.element_name = f"Unnamed channel with id {self.element_data['channel_id']}" + + +class ChannelRadio(RadioButton): + """A radio button for a channel.""" + + def __init__(self, channel: tuple, **kwargs) -> None: + label = f"{channel[1]} - Subs: {channel[2]}" + super().__init__(label=label, **kwargs) + self.channel_data = channel + + +class MigrationScreen(ModalWithClickExit): + """Screen with a dialog to remove old ATVS config.""" + BINDINGS = [("escape", "dismiss()", "Cancel"), + ("s", "remove_and_save", "Remove and save"), + ("q,ctrl+c", "exit", "Exit")] + AUTO_FOCUS = "#exit-save" + + def compose(self) -> ComposeResult: + yield Grid( + Label( + "Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, do you want to remove it?\n(The app won't start with it present)", + id="question", classes="button-100"), + Button("Remove and save", variant="primary", id="migrate-remove-save", classes="button-100"), + Button("Don't remove", variant="error", id="migrate-no-change", classes="button-100"), + id="dialog-migration", + ) + + def action_exit(self) -> None: + self.app.exit() + + @on(Button.Pressed, "#migrate-no-change") + def action_no_change(self) -> None: + self.app.pop_screen() + + @on(Button.Pressed, "#migrate-remove-save") + def action_remove_and_save(self) -> None: + del self.app.config.atvs + self.app.config.save() + self.app.pop_screen() + + +class ExitScreen(ModalWithClickExit): + """Screen with a dialog to exit.""" + BINDINGS = [("escape", "dismiss()", "Cancel"), + ("s", "save", "Save"), + ("q,ctrl+c", "exit", "Exit")] + AUTO_FOCUS = "#exit-save" + + def compose(self) -> ComposeResult: + yield Grid( + Label("Are you sure you want to exit without saving?", id="question", classes="button-100"), + Button("Save", variant="success", id="exit-save", classes="button-100"), + Button("Don't save", variant="error", id="exit-no-save", classes="button-100"), + Button("Cancel", variant="primary", id="exit-cancel", classes="button-100"), + id="dialog-exit", + ) + + def action_exit(self) -> None: + self.app.exit() + + def action_save(self) -> None: + self.app.config.save() + self.app.exit() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "exit-no-save": + self.app.exit() + elif event.button.id == "exit-save": + self.app.config.save() + self.app.exit() + else: + self.app.pop_screen() + + +class AddDevice(ModalWithClickExit): + """Screen with a dialog to add a device, either with a pairing code or with lan discovery.""" + BINDINGS = [("escape", "dismiss({})", "Return")] + + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + web_session = aiohttp.ClientSession() + self.api_helper = api_helpers.ApiHelper(config, web_session) + self.devices_discovered_dial = [] + + def compose(self) -> ComposeResult: + with Container(id="add-device-container"): + yield Label("Add Device", classes="title") + with Grid(id="add-device-switch-buttons"): + yield Button("Add with pairing code", id="add-device-pin-button", classes="button-switcher") + yield Button("Add with lan discovery", id="add-device-dial-button", classes="button-switcher") + with ContentSwitcher(id="add-device-switcher", initial="add-device-pin-container"): + with Container(id="add-device-pin-container"): + yield Input(placeholder="Pairing Code (found in Settings - Link with TV code)", + id="pairing-code-input", + validators=[ + Function(_validate_pairing_code, "Invalid pairing code format") + ] + ) + yield Input(placeholder="Device Name (auto filled if empty/optional)", id="device-name-input") + yield Button("Add", id="add-device-pin-add-button", variant="success", disabled=True) + yield Label(id="add-device-info") + with Container(id="add-device-dial-container"): + yield Label( + "Make sure your device is on the same network as this computer\nIf it isn't showing up, try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh the list, close and open the dialog again", + classes="subtitle") + yield SelectionList(("Searching for devices...", "", False), id="dial-devices-list", disabled=True) + yield Button("Add selected devices", id="add-device-dial-add-button", variant="success", + disabled=True) + + async def on_mount(self) -> None: + self.devices_discovered_dial = [] + asyncio.create_task(self.task_discover_devices()) + + async def task_discover_devices(self): + devices_found = await self.api_helper.discover_youtube_devices_dial() + list_widget: SelectionList = self.query_one("#dial-devices-list") + list_widget.clear_options() + if devices_found: + # print(devices_found) + devices_found_parsed = [] + for index, i in enumerate(devices_found): + devices_found_parsed.append(Selection(i["name"], index, False)) + list_widget.add_options(devices_found_parsed) + self.query_one("#dial-devices-list").disabled = False + self.devices_discovered_dial = devices_found + else: + list_widget.add_option(("No devices found", "", False)) + + @on(Button.Pressed, "#add-device-switch-buttons > *") + def handle_switch_buttons(self, event: Button.Pressed) -> None: + button_ = event.button.id + self.query_one("#add-device-switcher").current = event.button.id.replace("-button", "-container") + + @on(Input.Changed, "#pairing-code-input") + def changed_pairing_code(self, event: Input.Changed): + self.query_one("#add-device-button").disabled = not event.validation_result.is_valid + + @on(Input.Submitted, "#pairing-code-input") + @on(Button.Pressed, "#add-device-pin-add-button") + async def handle_add_device_pin(self) -> None: + self.query_one("#add-device-button").disabled = True + lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") + pairing_code = self.query_one("#pairing-code-input").value + pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces + device_name = self.parent.query_one("#device-name-input").value + paired = False + try: + paired = await lounge_controller.pair(pairing_code) + except: + pass + if paired: + device = { + "screen_id": lounge_controller.auth.screen_id, + "name": device_name if device_name else lounge_controller.screen_name, + "offset": 0, + } + self.query_one("#pairing-code-input").value = "" + self.query_one("#device-name-input").value = "" + self.query_one("#add-device-info").update(f"[#00ff00][b]Successfully added {device['name']}") + self.dismiss([device]) + else: + self.query_one("#pairing-code-input").value = "" + self.query_one("#add-device-button").disabled = False + self.query_one("#add-device-info").update("[#ff0000]Failed to add device") + + @on(Button.Pressed, "#add-device-dial-add-button") + def handle_add_device_dial(self) -> None: + list_widget: SelectionList = self.query_one("#dial-devices-list") + selected_devices = list_widget.selected + devices = [] + for i in selected_devices: + devices.append(self.devices_discovered_dial[i]) + self.dismiss(devices) + + @on(SelectionList.SelectedChanged, "#dial-devices-list") + def changed_device_list(self, event: SelectionList.SelectedChanged): + self.query_one("#add-device-dial-add-button").disabled = not event.selection_list.selected + + +class AddChannel(ModalWithClickExit): + """Screen with a dialog to add a channel, either using search or with a channel id.""" + BINDINGS = [("escape", "dismiss(())", "Return")] + + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + web_session = aiohttp.ClientSession() + self.api_helper = api_helpers.ApiHelper(config, web_session) + + def compose(self) -> ComposeResult: + with Container(id="add-channel-container"): + yield Label("Add Channel", classes="title") + yield Label( + "Select a method to add a channel. Adding via search only works if a YouTube api key has been set", + id="add-channel-label", classes="subtitle") + with Grid(id="add-channel-switch-buttons"): + yield Button("Add by channel name", id="add-channel-search-button", classes="button-switcher") + yield Button("Add by channel ID", id="add-channel-id-button", classes="button-switcher") + yield Label(id="add-channel-info", classes="subtitle") + with ContentSwitcher(id="add-channel-switcher", initial="add-channel-search-container"): + with Vertical(id="add-channel-search-container"): + if self.config.apikey: + with Grid(id="add-channel-search-inputs"): + yield Input(placeholder="Enter channel name", id="channel-name-input-search") + yield Button("Search", id="search-channel-button", variant="success") + yield RadioSet(RadioButton(label="Search to see results", disabled=True), + id="channel-search-results") + yield Button("Add", id="add-channel-button-search", variant="success", disabled=True, + classes="button-100") + else: + yield Label( + "[#ff0000]No api key set, cannot search for channels. You can add it the config section below", + id="add-channel-search-no-key", classes="subtitle") + with Vertical(id="add-channel-id-container"): + yield Input(placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)", + id="channel-id-input") + yield Input(placeholder="Enter channel name (only used to display in the config file)", + id="channel-name-input-id") + yield Button("Add", id="add-channel-button-id", variant="success", classes="button-100") + + @on(RadioSet.Changed, "#channel-search-results") + def handle_radio_set_changed(self, event: RadioSet.Changed) -> None: + self.query_one("#add-channel-button-search").disabled = False + + @on(Button.Pressed, "#add-channel-switch-buttons > *") + def handle_switch_buttons(self, event: Button.Pressed) -> None: + button_ = event.button.id + self.query_one("#add-channel-switcher").current = event.button.id.replace("-button", "-container") + + @on(Button.Pressed, "#search-channel-button") + @on(Input.Submitted, "#channel-name-input-search") + async def handle_search_channel(self) -> None: + channel_name = self.query_one("#channel-name-input-search").value + if not channel_name: + self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel name") + return + self.query_one("#search-channel-button").disabled = True + self.query_one("#add-channel-info").update("Searching...") + self.query_one("#add-channel-button-search").disabled = True + self.query_one("#channel-search-results").remove_children() + try: + channels_list = await self.api_helper.search_channels(channel_name) + except: + self.query_one("#add-channel-info").update("[#ff0000]Failed to search for channel") + self.query_one("#search-channel-button").disabled = False + return + for i in channels_list: + self.query_one("#channel-search-results").mount(ChannelRadio(i)) + if channels_list: + self.query_one("#search-channel-button").disabled = False + self.query_one("#add-channel-info").update("") + + @on(Button.Pressed, "#add-channel-button-search") + def handle_add_channel_search(self) -> None: + channel = self.query_one("#channel-search-results").pressed_button.channel_data + if not channel: + self.query_one("#add-channel-info").update("[#ff0000]Please select a channel") + return + self.query_one("#add-channel-info").update("Adding...") + self.dismiss(channel) + + @on(Button.Pressed, "#add-channel-button-id") + @on(Input.Submitted, "#channel-id-input") + @on(Input.Submitted, "#channel-name-input-id") + def handle_add_channel_id(self) -> None: + channel_id = self.query_one("#channel-id-input").value + channel_name = self.query_one("#channel-name-input-id").value + if not channel_id: + self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel ID") + return + if not channel_name: + channel_name = channel_id + channel = (channel_id, channel_name, "hidden") + self.query_one("#add-channel-info").update("Adding...") + self.dismiss(channel) + + +class EditDevice(ModalWithClickExit): + """Screen with a dialog to edit a device. Used by the DevicesManager.""" + BINDINGS = [("escape", "close_screen_saving", "Return")] + + def __init__(self, device: Element, **kwargs) -> None: + super().__init__(**kwargs) + self.device_data = device.element_data + self.device_widget = device + + def action_close_screen_saving(self) -> None: + self.dismiss() + + def dismiss(self) -> None: + self.device_data["name"] = self.query_one("#device-name-input").value + self.device_data["screen_id"] = self.query_one("#device-id-input").value + self.device_data["offset"] = int(self.query_one("#device-offset-input").value) + super().dismiss(self.device_widget) + + def compose(self) -> ComposeResult: + name = self.device_data.get("name", "") + offset = self.device_data.get("offset", 0) + with Container(id="edit-device-container"): + yield Label("Edit device (ESCAPE to exit)", classes="title") + yield Label("Device name") + yield Input(placeholder="Device name", id="device-name-input", value=name) + yield Label("Device screen id") + with Grid(id="device-id-container"): + yield Input(placeholder="Device id", id="device-id-input", value=self.device_data["screen_id"], + password=True) + yield Button("Show id", id="device-id-view") + yield Label("Device offset (in milliseconds)") + with Horizontal(id="device-offset-container"): + yield Input(id="device-offset-input", value=str(offset)) + yield Slider(name="Device offset", id="device-offset-slider", min=0, max=2000, step=100, value=offset) + + def on_slider_changed(self, event: Slider.Changed) -> None: + input = self.query_one("#device-offset-input") + with input.prevent(Input.Changed): + input.value = str(event.slider.value) + + def on_input_changed(self, event: Input.Changed): + if event.input.id == "device-offset-input": + value = event.input.value + if value.isdigit(): + value = int(value) + slider = self.query_one("#device-offset-slider") + with slider.prevent(Slider.Changed): + self.query_one("#device-offset-slider").value = value + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "device-id-view": + if "Show" in event.button.label: + event.button.label = "Hide id" + self.query_one("#device-id-input").password = False + else: + event.button.label = "Show id" + self.query_one("#device-id-input").password = True + + +class DevicesManager(Vertical): + """Manager for devices, allows to add, edit and remove devices.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + self.devices = config.devices + + def compose(self) -> ComposeResult: + yield Label("Devices", classes="title") + with Horizontal(id="add-device-button-container"): + yield Button("Add Device", id="add-device", classes="button-100") + for device in self.devices: + yield Device(device, tooltip="Click to edit") + + def new_devices(self, device_data) -> None: + if device_data: + device_widget = None + for i in device_data: + self.devices.append(i) + device_widget = Device(i, tooltip="Click to edit") + self.mount(device_widget) + device_widget.focus(scroll_visible=True) + + def edit_device(self, device_widget: Element) -> None: + device_widget.process_values_from_data() + device_widget.query_one("#element-name").label = device_widget.element_name + + @on(Button.Pressed, "#element-remove") + def remove_channel(self, event: Button.Pressed): + channel_to_remove: Element = event.button.parent + self.config.channel_whitelist.remove(channel_to_remove.element_data) + channel_to_remove.remove() + + @on(Button.Pressed, "#add-device") + def add_device(self, event: Button.Pressed): + self.app.push_screen(AddDevice(self.config), callback=self.new_devices) + + @on(Button.Pressed, "#element-name") + def edit_channel(self, event: Button.Pressed): + channel_to_edit: Element = event.button.parent + self.app.push_screen(EditDevice(channel_to_edit), callback=self.edit_device) + + +class ApiKeyManager(Vertical): + """Manager for the YouTube Api Key.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + + def compose(self) -> ComposeResult: + yield Label("YouTube Api Key", classes="title") + yield Label( + "You can get a YouTube Api Key from the [link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]") + with Grid(id="api-key-grid"): + yield Input(placeholder="YouTube Api Key", id="api-key-input", password=True, value=self.config.apikey) + yield Button("Show key", id="api-key-view") + + @on(Input.Changed, "#api-key-input") + def changed_api_key(self, event: Input.Changed): + self.config.apikey = event.input.value + # try: # ChannelWhitelist might not be mounted + # self.app.query_one("#warning-no-key").display = not self.config.apikey + # except: + # pass + + @on(Button.Pressed, "#api-key-view") + def pressed_api_key_view(self, event: Button.Pressed): + if "Show" in event.button.label: + event.button.label = "Hide key" + self.query_one("#api-key-input").password = False + else: + event.button.label = "Show key" + self.query_one("#api-key-input").password = True + + +class SkipCategoriesManager(Vertical): + """Manager for skip categories, allows to select which categories to skip.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + + def compose(self) -> ComposeResult: + yield Label("Skip Categories", classes="title") + yield Label("Select the categories you want to skip", classes="subtitle") + skip_categories_parsed = [] + for i in skip_categories: + name, value = i + if value in self.config.skip_categories: + skip_categories_parsed.append((name, value, True)) + else: + skip_categories_parsed.append((name, value, False)) + # print(skip_categories_parsed) + yield SelectionList(*skip_categories_parsed, id="skip-categories-compact-list") + + @on(SelectionList.SelectedChanged, "#skip-categories-compact-list") + def changed_skip_categories(self, event: SelectionList.SelectedChanged): + self.config.skip_categories = event.selection_list.selected + + +class SkipCountTrackingManager(Vertical): + """Manager for skip count tracking, allows to enable/disable skip count tracking.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + + def compose(self) -> ComposeResult: + yield Label("Skip count tracking", classes="title") + yield Label( + "This feature tracks which segments you have skipped to let users know how much their submission has helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. The program sends a message to the sponsor block server each time you skip a segment. Hopefully most people don't change this setting so that the view numbers are accurate. :)", + classes="subtitle", id="skip-count-tracking-subtitle") + yield Checkbox(value=self.config.skip_count_tracking, id="skip-count-tracking-switch", + label="Enable skip count tracking") + + @on(Checkbox.Changed, "#skip-count-tracking-switch") + def changed_skip_tracking(self, event: Checkbox.Changed): + self.config.skip_count_tracking = event.checkbox.value + + +class AdSkipMuteManager(Vertical): + """Manager for ad skip/mute, allows to enable/disable ad skip/mute.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + + def compose(self) -> ComposeResult: + yield Label("Skip/Mute ads", classes="title") + yield Label( + "This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.", + classes="subtitle", id="skip-count-tracking-subtitle") + with Horizontal(id="ad-skip-mute-container"): + yield Checkbox(value=self.config.mute_ads, id="mute-ads-switch", + label="Enable skipping ads") + yield Checkbox(value=self.config.skip_ads, id="skip-ads-switch", + label="Enable muting ads") + + @on(Checkbox.Changed, "#mute-ads-switch") + def changed_mute(self, event: Checkbox.Changed): + self.config.mute_ads = event.checkbox.value + + @on(Checkbox.Changed, "#skip-ads-switch") + def changed_skip(self, event: Checkbox.Changed): + self.config.skip_ads = event.checkbox.value + + +class ChannelWhitelistManager(Vertical): + """Manager for channel whitelist, allows to add/remove channels from the whitelist.""" + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + + def compose(self) -> ComposeResult: + yield Label("Channel Whitelist", classes="title") + yield Label( + "This feature allows to whitelist channels from being skipped. This feature is automatically disabled when no channels have been specified.", + classes="subtitle", id="channel-whitelist-subtitle") + yield Label(":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature", + id="warning-no-key") + with Horizontal(id="add-channel-button-container"): + yield Button("Add Channel", id="add-channel", classes="button-100") + for channel in self.config.channel_whitelist: + yield Channel(channel) + + def on_mount(self) -> None: + self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(self.config.channel_whitelist) + def new_channel(self, channel: tuple) -> None: + if channel: + channel_dict = { + "id": channel[0], + "name": channel[1], + } + self.config.channel_whitelist.append(channel_dict) + channel_widget = Channel(channel_dict) + self.mount(channel_widget) + channel_widget.focus(scroll_visible=True) + self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool( + self.config.channel_whitelist) + + @on(Button.Pressed, "#element-remove") + def remove_channel(self, event: Button.Pressed): + channel_to_remove: Element = event.button.parent + self.config.channel_whitelist.remove(channel_to_remove.element_data) + channel_to_remove.remove() + self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool( + self.config.channel_whitelist) + + @on(Button.Pressed, "#add-channel") + def add_channel(self, event: Button.Pressed): + self.app.push_screen(AddChannel(self.config), callback=self.new_channel) + + +class iSponsorBlockTVSetupMainScreen(Screen): + """Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221""" + TITLE = "iSponsorBlockTV" + SUB_TITLE = "Setup Wizard" + BINDINGS = [ + ("q,ctrl+c", "exit_modal", "Exit"), + ("s", "save", "Save") + ] + AUTO_FOCUS = None + + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.dark = True + self.config = config + self.initial_config = copy.deepcopy(config) + + def compose(self) -> ComposeResult: + yield Header() + yield Footer() + with ScrollableContainer(id="setup-wizard"): + yield DevicesManager(config=self.config, id="devices-manager", classes="container") + yield SkipCategoriesManager(config=self.config, id="skip-categories-manager", classes="container") + yield SkipCountTrackingManager(config=self.config, id="count-segments-manager", classes="container") + yield AdSkipMuteManager(config=self.config, id="ad-skip-mute-manager", classes="container") + yield ChannelWhitelistManager(config=self.config, id="channel-whitelist-manager", classes="container") + yield ApiKeyManager(config=self.config, id="api-key-manager", classes="container") + + def on_mount(self) -> None: + if self.check_for_old_config_entries(): + self.app.push_screen(MigrationScreen()) + pass + + def action_save(self) -> None: + self.config.save() + self.initial_config = copy.deepcopy(self.config) + + def action_exit_modal(self) -> None: + if self.config != self.initial_config: + self.app.push_screen(ExitScreen()) + else: # No changes were made + self.app.exit() + + def check_for_old_config_entries(self) -> bool: + if hasattr(self.config, "atvs"): + return True + return False + + @on(Input.Changed, "#api-key-input") + def changed_api_key(self, event: Input.Changed): + print("HIIII") + try: # ChannelWhitelist might not be mounted + # Show if no api key is set and at least one channel is in the whitelist + self.app.query_one("#warning-no-key").display = (not event.input.value) and self.config.channel_whitelist + except: + pass + +class iSponsorBlockTVSetup(App): + CSS_PATH = "setup-wizard-style.tcss" # tcss is the recommended extension for textual css files + # Bindings for the whole app here, so they are available in all screens + BINDINGS = [ + ("q,ctrl+c", "exit_modal", "Exit"), + ("s", "save", "Save") + ] + + def __init__(self, config, **kwargs) -> None: + super().__init__(**kwargs) + self.config = config + self.main_screen = iSponsorBlockTVSetupMainScreen(config=self.config) + + def on_mount(self) -> None: + self.push_screen(self.main_screen) + + def action_save(self) -> None: + self.main_screen.action_save() + + def action_exit_modal(self) -> None: + self.main_screen.action_exit_modal() + + +def main(config): + app = iSponsorBlockTVSetup(config) + app.run() \ No newline at end of file diff --git a/iSponsorBlockTV/ytlounge.py b/iSponsorBlockTV/ytlounge.py new file mode 100644 index 0000000..ba06453 --- /dev/null +++ b/iSponsorBlockTV/ytlounge.py @@ -0,0 +1,114 @@ +import asyncio +import aiohttp +import pyytlounge + +create_task = asyncio.create_task + + +class YtLoungeApi(pyytlounge.YtLoungeApi): + def __init__(self, screen_id, config=None, api_helper=None): + super().__init__("iSponsorBlockTV") + self.auth.screen_id = screen_id + self.auth.lounge_id_token = None + self.api_helper = api_helper + self.volume_state = {} + self.subscribe_task = None + self.subscribe_task_watchdog = None + self.callback = None + if config: + self.mute_ads = config.mute_ads + self.skip_ads = config.skip_ads + + # Ensures that we still are subscribed to the lounge + async def _watchdog(self): + await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other) + try: + self.subscribe_task.cancel() + except Exception as e: + pass + + # Subscribe to the lounge and start the watchdog + async def subscribe_monitored(self, callback): + self.callback = callback + try: + self.subscribe_task_watchdog.cancel() + except: + pass # No watchdog task + self.subscribe_task = asyncio.create_task(super().subscribe(callback)) + self.subscribe_task_watchdog = asyncio.create_task(self._watchdog()) + return self.subscribe_task + + # Process a lounge subscription event + def _process_event(self, event_id: int, event_type: str, args): + # print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})") + # (Re)start the watchdog + try: + self.subscribe_task_watchdog.cancel() + except: + pass + finally: + self.subscribe_task_watchdog = asyncio.create_task(self._watchdog()) + # A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we can get the segments) + if event_type == "onStateChange": + data = args[0] + self.state.apply_state(data) + self._update_state() + # print(data) + # Unmute when the video starts playing + if self.mute_ads and data["state"] == "1": + create_task(self.mute(False, override=True)) + elif event_type == "nowPlaying": + data = args[0] + self.state = pyytlounge.PlaybackState(self._logger, data) + self._update_state() + # Unmute when the video starts playing + if self.mute_ads and data.get("state", "0") == "1": + create_task(self.mute(False, override=True)) + elif self.mute_ads and event_type == "onAdStateChange": + data = args[0] + if data["adState"] == '0': # Ad is not playing + create_task(self.mute(False, override=True)) + else: # Seen multiple other adStates, assuming they are all ads + create_task(self.mute(True, override=True)) + # Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it) + elif event_type == "onVolumeChanged": + self.volume_state = args[0] + pass + # Gets segments for the next video before it starts playing + elif event_type == "autoplayUpNext": + if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty + create_task(self.api_helper.get_segments(vid_id)) + + # #Used to know if an ad is skippable or not + elif event_type == "adPlaying": + data = args[0] + # Gets segments for the next video (after the ad) before it starts playing + if vid_id := data["contentVideoId"]: + create_task(self.api_helper.get_segments(vid_id)) + + if data["isSkippable"] == "true": # YouTube uses strings for booleans + if self.skip_ads: + create_task(self.skip_ad()) + create_task(self.mute(False, override=True)) + elif self.mute_ads: + create_task(self.mute(True, override=True)) + else: + super()._process_event(event_id, event_type, args) + + # Set the volume to a specific value (0-100) + async def set_volume(self, volume: int) -> None: + await super()._command("setVolume", {"volume": volume}) + + # Mute or unmute the device (if the device already is in the desired state, nothing happens) + # mute: True to mute, False to unmute + # override: If True, the command is sent even if the device already is in the desired state + # TODO: Only works if the device is subscribed to the lounge + async def mute(self, mute: bool, override: bool = False) -> None: + if mute: + mute_str = "true" + else: + mute_str = "false" + if override or not (self.volume_state.get("muted", "false") == mute_str): + self.volume_state["muted"] = mute_str + # YouTube wants the volume when unmuting, so we send it + await super()._command("setVolume", {"volume": self.volume_state.get("volume", 100), "muted": mute_str}) diff --git a/main_tui.py b/main_tui.py new file mode 100644 index 0000000..5a417f0 --- /dev/null +++ b/main_tui.py @@ -0,0 +1,5 @@ +from iSponsorBlockTV import setup_wizard +from iSponsorBlockTV.helpers import Config + +config = Config("config.json") +setup_wizard.main(config) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 49fba26..2b1874f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,10 @@ miniaudio==1.57 -pyatv aiohttp -aiodns async-cache -argparse \ No newline at end of file +argparse +pyytlounge +textual +textual-slider +ssdp +rich +xmltodict \ No newline at end of file