[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot]
2024-01-14 13:47:13 +00:00
parent dae27e7aa3
commit a4f0b5fffe
23 changed files with 543 additions and 240 deletions

View File

@@ -5,4 +5,4 @@ name = "python"
enabled = true
[analyzers.meta]
runtime_version = "3.x.x"
runtime_version = "3.x.x"

View File

@@ -7,7 +7,7 @@ assignees: ''
---
Before opening an issue make sure that there are no duplicates and that you are on the latest version.
Before opening an issue make sure that there are no duplicates and that you are on the latest version.
**Describe the bug**
A clear and concise description of what the bug is.

View File

@@ -15,10 +15,10 @@ on:
workflow_dispatch:
permissions:
permissions:
contents: read
packages: write
jobs:
build:
runs-on: ubuntu-latest
@@ -39,7 +39,7 @@ jobs:
type=ref,event=tag
type=ref,event=branch
type=schedule
# https://github.com/docker/setup-qemu-action
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
@@ -47,7 +47,7 @@ jobs:
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
if: github.event_name != 'pull_request'
uses: docker/login-action@v2

View File

@@ -34,4 +34,4 @@ jobs:
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
uses: pypa/gh-action-pypi-publish@release/v1

View File

@@ -27,4 +27,4 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
repository: dmunozv04/isponsorblocktv
short-description: ${{ github.event.repository.description }}
short-description: ${{ github.event.repository.description }}

2
.gitignore vendored
View File

@@ -159,4 +159,4 @@ data/config.json
.DS_Store
.DS_Store
.DS_Store

View File

@@ -34,4 +34,4 @@ repos:
hooks:
- id: pycln
language_version: "3.11"
args: [--all]
args: [--all]

View File

@@ -16,4 +16,4 @@ RUN python -m compileall
COPY src .
ENTRYPOINT ["python3", "-u", "main.py"]
ENTRYPOINT ["python3", "-u", "main.py"]

View File

@@ -18,4 +18,4 @@
"name": ""
}
]
}
}

View File

@@ -5,4 +5,4 @@ services:
container_name: iSponsorBlockTV
restart: unless-stopped
volumes:
- /PATH_TO_YOUR_DATA_DIR:/app/data
- /PATH_TO_YOUR_DATA_DIR:/app/data

View File

@@ -7,4 +7,4 @@ rich==13.6.0
ssdp==1.3.0
textual==0.40.0
textual-slider==0.1.1
xmltodict==0.13.0
xmltodict==0.13.0

View File

@@ -1,9 +1,11 @@
from cache import AsyncLRU
from .conditional_ttl_cache import AsyncConditionalTTL
from . import constants, dial_client
from hashlib import sha256
from aiohttp import ClientSession
import html
from hashlib import sha256
from aiohttp import ClientSession
from cache import AsyncLRU
from . import constants, dial_client
from .conditional_ttl_cache import AsyncConditionalTTL
def list_to_tuple(function):
@@ -72,7 +74,13 @@ class ApiHelper:
@AsyncLRU(maxsize=10)
async def search_channels(self, channel):
channels = []
params = {"q": channel, "key": self.apikey, "part": "snippet", "type": "channel", "maxResults": "5"}
params = {
"q": channel,
"key": self.apikey,
"part": "snippet",
"type": "channel",
"maxResults": "5",
}
url = constants.Youtube_api + "search"
async with self.web_session.get(url, params=params) as resp:
data = await resp.json()
@@ -81,7 +89,11 @@ class ApiHelper:
for i in data["items"]:
# Get channel subscription number
params = {"id": i["snippet"]["channelId"], "key": self.apikey, "part": "statistics"}
params = {
"id": i["snippet"]["channelId"],
"key": self.apikey,
"part": "statistics",
}
url = constants.Youtube_api + "channels"
async with self.web_session.get(url, params=params) as resp:
channel_data = await resp.json()
@@ -89,20 +101,29 @@ class ApiHelper:
if channel_data["items"][0]["statistics"]["hiddenSubscriberCount"]:
sub_count = "Hidden"
else:
sub_count = int(channel_data["items"][0]["statistics"]["subscriberCount"])
sub_count = int(
channel_data["items"][0]["statistics"]["subscriberCount"]
)
sub_count = format(sub_count, "_")
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], sub_count))
channels.append(
(i["snippet"]["channelId"], i["snippet"]["channelTitle"], sub_count)
)
return channels
@list_to_tuple # Convert list to tuple so it can be used as a key in the cache
@AsyncConditionalTTL(time_to_live=300, maxsize=10) # 5 minutes for non-locked segments
@AsyncConditionalTTL(
time_to_live=300, maxsize=10
) # 5 minutes for non-locked segments
async def get_segments(self, vid_id):
if await self.is_whitelisted(vid_id):
return [], True # Return empty list and True to indicate that the cache should last forever
return (
[],
True,
) # Return empty list and True to indicate that the cache should last forever
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4
] # Hashes video id and gets the first 4 characters
:4
] # Hashes video id and gets the first 4 characters
params = {
"category": self.skip_categories,
"actionType": constants.SponsorBlock_actiontype,
@@ -110,13 +131,16 @@ class ApiHelper:
}
headers = {"Accept": "application/json"}
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with self.web_session.get(url, headers=headers, params=params) as response:
async with self.web_session.get(
url, headers=headers, params=params
) as response:
response_json = await response.json()
if response.status != 200:
response_text = await response.text()
print(
f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. "
f"Code: {response.status} - {response_text}")
f"Code: {response.status} - {response_text}"
)
return [], True
for i in response_json:
if str(i["videoID"]) == str(vid_id):
@@ -130,7 +154,9 @@ class ApiHelper:
ignore_ttl = True
try:
for i in response["segments"]:
ignore_ttl = ignore_ttl and i["locked"] == 1 # If all segments are locked, ignore ttl
ignore_ttl = (
ignore_ttl and i["locked"] == 1
) # If all segments are locked, ignore ttl
segment = i["segment"]
UUID = i["UUID"]
segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]}
@@ -143,7 +169,7 @@ class ApiHelper:
except Exception:
segment_before_end = -10
if (
segment_dict["start"] - segment_before_end < 1
segment_dict["start"] - segment_before_end < 1
): # Less than 1 second apart, combine them and skip them together
segment_dict["start"] = segment_before_start
segment_dict["UUID"].extend(segment_before_UUID)

View File

@@ -1,6 +1,7 @@
import datetime
from cache.key import KEY
from cache.lru import LRU
import datetime
"""MIT License
@@ -23,7 +24,7 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE."""
'''Modified code from https://github.com/iamsinghrajat/async-cache'''
"""Modified code from https://github.com/iamsinghrajat/async-cache"""
class AsyncConditionalTTL:
@@ -31,9 +32,9 @@ class AsyncConditionalTTL:
def __init__(self, time_to_live, maxsize):
super().__init__(maxsize=maxsize)
self.time_to_live = datetime.timedelta(
seconds=time_to_live
) if time_to_live else None
self.time_to_live = (
datetime.timedelta(seconds=time_to_live) if time_to_live else None
)
self.maxsize = maxsize
@@ -55,13 +56,13 @@ class AsyncConditionalTTL:
def __setitem__(self, key, value):
value, ignore_ttl = value # unpack tuple
ttl_value = (
datetime.datetime.now() + self.time_to_live
) if (self.time_to_live and not ignore_ttl) else None # ignore ttl if ignore_ttl is True
(datetime.datetime.now() + self.time_to_live)
if (self.time_to_live and not ignore_ttl)
else None
) # ignore ttl if ignore_ttl is True
super().__setitem__(key, (value, ttl_value))
def __init__(
self, time_to_live=60, maxsize=1024, skip_args: int = 0
):
def __init__(self, time_to_live=60, maxsize=1024, skip_args: int = 0):
"""
:param time_to_live: Use time_to_live as None for non expiring cache
@@ -73,7 +74,7 @@ class AsyncConditionalTTL:
def __call__(self, func):
async def wrapper(*args, **kwargs):
key = KEY(args[self.skip_args:], kwargs)
key = KEY(args[self.skip_args :], kwargs)
if key in self.ttl:
val = self.ttl[key]
else:

View File

@@ -1,13 +1,19 @@
import asyncio
import aiohttp
from . import api_helpers, ytlounge
async def pair_device():
try:
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ")
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces
pairing_code = input(
"Enter pairing code (found in Settings - Link with TV code): "
)
pairing_code = int(
pairing_code.replace("-", "").replace(" ", "")
) # remove dashes and spaces
print("Pairing...")
paired = await lounge_controller.pair(pairing_code)
if not paired:
@@ -33,8 +39,14 @@ def main(config, debug: bool) -> None:
if hasattr(config, "atvs"):
print(
"The atvs config option is deprecated and has stopped working. Please read this for more information on "
"how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2")
if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y":
"how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
)
if (
input(
"Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) "
)
== "y"
):
del config["atvs"]
devices = config.devices
while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n":
@@ -50,7 +62,12 @@ def main(config, debug: bool) -> None:
if input("API key already specified. Change it? (y/n) ") == "y":
apikey = input("Enter your API key: ")
else:
if input("API key only needed for the channel whitelist function. Add it? (y/n) ") == "y":
if (
input(
"API key only needed for the channel whitelist function. Add it? (y/n) "
)
== "y"
):
print(
"Get youtube apikey here: https://developers.google.com/youtube/registering_an_application"
)
@@ -65,31 +82,41 @@ def main(config, debug: bool) -> None:
"interaction poi_highlight intro outro preview filler music_offtopic]:\n"
)
skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
skip_categories = [
x for x in skip_categories if x != ""
] # Remove empty strings
else:
categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, "
"interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n"
)
skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
skip_categories = [
x for x in skip_categories if x != ""
] # Remove empty strings
config.skip_categories = skip_categories
channel_whitelist = config.channel_whitelist
if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y":
if (
input("Do you want to whitelist any channels from being ad-blocked? (y/n) ")
== "y"
):
if not apikey:
print(
"WARNING: You need to specify an API key to use this function, otherwise the program will fail to "
"start.\nYou can add one by re-running this setup wizard.")
"start.\nYou can add one by re-running this setup wizard."
)
web_session = aiohttp.ClientSession()
api_helper = api_helpers.ApiHelper(config, web_session)
while True:
channel_info = {}
channel = input("Enter a channel name or \"/exit\" to exit: ")
channel = input('Enter a channel name or "/exit" to exit: ')
if channel == "/exit":
break
task = loop.create_task(api_helper.search_channels(channel, apikey, web_session))
task = loop.create_task(
api_helper.search_channels(channel, apikey, web_session)
)
loop.run_until_complete(task)
results = task.result()
if len(results) == 0:
@@ -123,7 +150,11 @@ def main(config, debug: bool) -> None:
config.channel_whitelist = channel_whitelist
config.skip_count_tracking = not input(
"Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n"
config.skip_count_tracking = (
not input(
"Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) "
)
== "n"
)
print("Config finished")
config.save()

View File

@@ -6,16 +6,16 @@ SponsorBlock_api = "https://sponsor.ajay.app/api/"
Youtube_api = "https://www.googleapis.com/youtube/v3/"
skip_categories = (
('Sponsor', 'sponsor'),
('Self Promotion', 'selfpromo'),
('Intro', 'intro'),
('Outro', 'outro'),
('Music Offtopic', 'music_offtopic'),
('Interaction', 'interaction'),
('Exclusive Access', 'exclusive_access'),
('POI Highlight', 'poi_highlight'),
('Preview', 'preview'),
('Filler', 'filler'),
("Sponsor", "sponsor"),
("Self Promotion", "selfpromo"),
("Intro", "intro"),
("Outro", "outro"),
("Music Offtopic", "music_offtopic"),
("Interaction", "interaction"),
("Exclusive Access", "exclusive_access"),
("POI Highlight", "poi_highlight"),
("Preview", "preview"),
("Filler", "filler"),
)
youtube_client_blacklist = ["TVHTML5_FOR_KIDS"]

View File

@@ -1,21 +1,22 @@
"""Send out a M-SEARCH request and listening for responses."""
import asyncio
import socket
import ssdp
from ssdp import network
import xmltodict
from ssdp import network
'''Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”),
with or without modification, are permitted provided that the following conditions are met: ● Redistributions of the
DIAL Specification must retain the above copyright notice, this list of conditions and the following disclaimer. ●
Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright
notice, this list of conditions and the following disclaimer. ● Redistributions of implementations of the DIAL
Specification in binary form must include the above copyright notice. ● The DIAL mark, the NETFLIX mark and the names
of contributors to the DIAL Specification may not be used to endorse or promote specifications, software, products,
or any other materials derived from the DIAL Specification without specific prior written permission. The DIAL mark
is owned by Netflix and information on licensing the DIAL mark is available at www.dial-multiscreen.org.'''
"""Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”),
with or without modification, are permitted provided that the following conditions are met: ● Redistributions of the
DIAL Specification must retain the above copyright notice, this list of conditions and the following disclaimer. ●
Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright
notice, this list of conditions and the following disclaimer. ● Redistributions of implementations of the DIAL
Specification in binary form must include the above copyright notice. ● The DIAL mark, the NETFLIX mark and the names
of contributors to the DIAL Specification may not be used to endorse or promote specifications, software, products,
or any other materials derived from the DIAL Specification without specific prior written permission. The DIAL mark
is owned by Netflix and information on licensing the DIAL mark is available at www.dial-multiscreen.org."""
'''
"""
MIT License
Copyright (c) 2018 Johannes Hoppe
@@ -36,8 +37,8 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.'''
'''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py'''
SOFTWARE."""
"""Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py"""
def get_ip():
@@ -45,17 +46,16 @@ def get_ip():
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
s.connect(("10.254.254.254", 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
ip = "127.0.0.1"
finally:
s.close()
return ip
class Handler(ssdp.aio.SSDP):
def __init__(self):
super().__init__()
self.devices = []
@@ -107,7 +107,9 @@ async def discover(web_session):
family, addr = network.get_best_family(bind, network.PORT)
loop = asyncio.get_event_loop()
ip_address = get_ip()
connect = loop.create_datagram_endpoint(handler, family=family, local_addr=(ip_address, None))
connect = loop.create_datagram_endpoint(
handler, family=family, local_addr=(ip_address, None)
)
transport, protocol = await connect
target = network.MULTICAST_ADDRESS_IPV4, network.PORT

View File

@@ -59,7 +59,9 @@ class Config:
sys.exit()
self.devices = [Device(i) for i in self.devices]
if not self.apikey and self.channel_whitelist:
raise ValueError("No youtube API key found and channel whitelist is not empty")
raise ValueError(
"No youtube API key found and channel whitelist is not empty"
)
if not self.skip_categories:
self.skip_categories = ["sponsor"]
print("No categories found, using default: sponsor")
@@ -79,11 +81,15 @@ class Config:
print("Creating data directory")
os.makedirs(self.data_dir)
else: # Running in docker without mounting the data dir
print("Running in docker without mounting the data dir, check the wiki for more information: "
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker")
print("This image has recently been updated to v2, and requires changes.",
"Please read this for more information on how to upgrade to V2:",
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2")
print(
"Running in docker without mounting the data dir, check the wiki for more information: "
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker"
)
print(
"This image has recently been updated to v2, and requires changes.",
"Please read this for more information on how to upgrade to V2:",
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2",
)
print("Exiting in 10 seconds...")
time.sleep(10)
sys.exit()
@@ -110,11 +116,22 @@ class Config:
def app_start():
# If env has a data dir use that, otherwise use the default
default_data_dir = os.getenv("iSPBTV_data_dir") or user_data_dir("iSponsorBlockTV", "dmunozv04")
default_data_dir = os.getenv("iSPBTV_data_dir") or user_data_dir(
"iSponsorBlockTV", "dmunozv04"
)
parser = argparse.ArgumentParser(description="iSponsorblockTV")
parser.add_argument("--data-dir", "-d", default=default_data_dir, help="data directory")
parser.add_argument("--setup", "-s", action="store_true", help="setup the program graphically")
parser.add_argument("--setup-cli", "-sc", action="store_true", help="setup the program in the command line")
parser.add_argument(
"--data-dir", "-d", default=default_data_dir, help="data directory"
)
parser.add_argument(
"--setup", "-s", action="store_true", help="setup the program graphically"
)
parser.add_argument(
"--setup-cli",
"-sc",
action="store_true",
help="setup the program in the command line",
)
parser.add_argument("--debug", action="store_true", help="debug mode")
args = parser.parse_args()

View File

@@ -1,10 +1,10 @@
import logging
from rich.logging import RichHandler
from rich.text import Text
from rich.style import Style
from rich.text import Text
'''
"""
Copyright (c) 2020 Will McGugan
Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -25,7 +25,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Modified code from rich (https://github.com/textualize/rich)
'''
"""
class LogHandler(RichHandler):
@@ -40,7 +40,8 @@ class LogHandler(RichHandler):
show_path=False,
time_format="[%x %X]",
omit_repeated_times=True,
level_width=None)
level_width=None,
)
def add_filter_string(self, s):
self.filter_strings.append(s)
@@ -56,14 +57,17 @@ class LogHandler(RichHandler):
class LogRender:
def __init__(self, device_name,
log_name_len,
show_time=True,
show_level=False,
show_path=True,
time_format="[%x %X]",
omit_repeated_times=True,
level_width=8):
def __init__(
self,
device_name,
log_name_len,
show_time=True,
show_level=False,
show_path=True,
time_format="[%x %X]",
omit_repeated_times=True,
level_width=8,
):
self.filter_strings = []
self.log_name_len = log_name_len
self.device_name = device_name
@@ -76,15 +80,15 @@ class LogRender:
self._last_time = None
def __call__(
self,
console,
renderables,
log_time,
time_format=None,
level="",
path=None,
line_no=None,
link_path=None,
self,
console,
renderables,
log_time,
time_format=None,
level="",
path=None,
line_no=None,
link_path=None,
):
from rich.containers import Renderables
from rich.table import Table
@@ -95,7 +99,9 @@ class LogRender:
output.add_column(style="log.time")
if self.show_level:
output.add_column(style="log.level", width=self.level_width)
output.add_column(width=self.log_name_len, style=Style(color="yellow"), overflow="fold")
output.add_column(
width=self.log_name_len, style=Style(color="yellow"), overflow="fold"
)
output.add_column(ratio=1, style="log.message", overflow="fold")
if self.show_path and path:
output.add_column(style="log.path")
@@ -134,7 +140,9 @@ class LogRender:
class LogFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, style="%", validate=True, filter_strings=None):
def __init__(
self, fmt=None, datefmt=None, style="%", validate=True, filter_strings=None
):
super().__init__(fmt, datefmt, style, validate)
self.filter_strings = filter_strings or []

View File

@@ -1,6 +1,8 @@
import plistlib
import os
import plistlib
from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons"""
default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV",
@@ -40,8 +42,8 @@ def main():
create_plist(correct_path)
run_setup(correct_path + "/config.json")
print(
'Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load '
'~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist'
"Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load "
"~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
)
else:
if not os.path.exists(correct_path):

View File

@@ -1,12 +1,13 @@
import asyncio
import aiohttp
import time
import logging
import rich
import time
from signal import SIGINT, SIGTERM, signal
from typing import Optional
from signal import signal, SIGINT, SIGTERM
from . import api_helpers, ytlounge, logging_helpers
import aiohttp
import rich
from . import api_helpers, logging_helpers, ytlounge
class DeviceListener:
@@ -16,7 +17,7 @@ class DeviceListener:
self.offset = device.offset
self.name = device.name
self.cancelled = False
self.logger = logging.getLogger(f'iSponsorBlockTV-{device.screen_id}')
self.logger = logging.getLogger(f"iSponsorBlockTV-{device.screen_id}")
if debug:
self.logger.setLevel(logging.DEBUG)
else:
@@ -26,7 +27,8 @@ class DeviceListener:
self.logger.addHandler(rh)
self.logger.info(f"Starting device")
self.lounge_controller = ytlounge.YtLoungeApi(
device.screen_id, config, api_helper, self.logger)
device.screen_id, config, api_helper, self.logger
)
# Ensures that we have a valid auth token
async def refresh_auth_loop(self):
@@ -68,7 +70,9 @@ class DeviceListener:
await lounge_controller.connect()
except:
pass
self.logger.info("Connected to device %s (%s)", lounge_controller.screen_name, self.name)
self.logger.info(
"Connected to device %s (%s)", lounge_controller.screen_name, self.name
)
try:
# print("Subscribing to lounge")
sub = await lounge_controller.subscribe_monitored(self)
@@ -91,7 +95,9 @@ class DeviceListener:
if state.videoId:
segments = await self.api_helper.get_segments(state.videoId)
if state.state.value == 1: # Playing
self.logger.info(f"Playing video {state.videoId} with {len(segments)} segments")
self.logger.info(
f"Playing video {state.videoId} with {len(segments)} segments"
)
if segments: # If there are segments
await self.time_to_segment(segments, state.currentTime, time_start)
@@ -100,18 +106,20 @@ class DeviceListener:
start_next_segment = None
next_segment = None
for segment in segments:
if position < 2 and (
segment["start"] <= position < segment["end"]
):
if position < 2 and (segment["start"] <= position < segment["end"]):
next_segment = segment
start_next_segment = position # different variable so segment doesn't change
start_next_segment = (
position # different variable so segment doesn't change
)
break
if segment["start"] > position:
next_segment = segment
start_next_segment = next_segment["start"]
break
if start_next_segment:
time_to_next = start_next_segment - position - (time.time() - time_start) - self.offset
time_to_next = (
start_next_segment - position - (time.time() - time_start) - self.offset
)
await self.skip(time_to_next, next_segment["end"], next_segment["UUID"])
# Skips to the next segment (waits for the time to pass)

View File

@@ -362,4 +362,4 @@ MigrationScreen {
height: 1fr;
width: 1fr;
content-align: center middle;
}
}

View File

@@ -2,15 +2,33 @@ import asyncio
import copy
import aiohttp
# Textual imports (Textual is awesome!)
from textual import on
from textual.app import App, ComposeResult
from textual.containers import ScrollableContainer, Grid, Container, Vertical, Horizontal
from textual.containers import (
Container,
Grid,
Horizontal,
ScrollableContainer,
Vertical,
)
from textual.events import Click
from textual.screen import Screen
from textual.validation import Function
from textual.widgets import Button, Footer, Header, Static, Label, Input, SelectionList, Checkbox, ContentSwitcher, \
RadioSet, RadioButton
from textual.widgets import (
Button,
Checkbox,
ContentSwitcher,
Footer,
Header,
Input,
Label,
RadioButton,
RadioSet,
SelectionList,
Static,
)
from textual.widgets.selection_list import Selection
from textual_slider import Slider
@@ -30,7 +48,9 @@ def _validate_pairing_code(pairing_code: str) -> bool:
class ModalWithClickExit(Screen):
"""A modal screen that exits when clicked outside its bounds.
https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604"""
https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604
"""
DEFAULT_CSS = """
ModalWithClickExit {
align: center middle;
@@ -62,8 +82,15 @@ class Element(Static):
pass
def compose(self) -> ComposeResult:
yield Button(label=self.element_name, classes="element-name", disabled=True, id="element-name")
yield Button("Remove", classes="element-remove", variant="error", id="element-remove")
yield Button(
label=self.element_name,
classes="element-name",
disabled=True,
id="element-name",
)
yield Button(
"Remove", classes="element-remove", variant="error", id="element-remove"
)
def on_mount(self) -> None:
if self.tooltip:
@@ -80,9 +107,11 @@ class Device(Element):
if "name" in self.element_data and self.element_data["name"]:
self.element_name = self.element_data["name"]
else:
self.element_name = (f"Unnamed device with id "
f"{self.element_data['screen_id'][:5]}..."
f"{self.element_data['screen_id'][-5:]}")
self.element_name = (
f"Unnamed device with id "
f"{self.element_data['screen_id'][:5]}..."
f"{self.element_data['screen_id'][-5:]}"
)
class Channel(Element):
@@ -92,7 +121,9 @@ class Channel(Element):
if "name" in self.element_data:
self.element_name = self.element_data["name"]
else:
self.element_name = f"Unnamed channel with id {self.element_data['channel_id']}"
self.element_name = (
f"Unnamed channel with id {self.element_data['channel_id']}"
)
class ChannelRadio(RadioButton):
@@ -106,9 +137,12 @@ class ChannelRadio(RadioButton):
class MigrationScreen(ModalWithClickExit):
"""Screen with a dialog to remove old ATVS config."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "remove_and_save", "Remove and save"),
("q,ctrl+c", "exit", "Exit")]
BINDINGS = [
("escape", "dismiss()", "Cancel"),
("s", "remove_and_save", "Remove and save"),
("q,ctrl+c", "exit", "Exit"),
]
AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult:
@@ -116,9 +150,21 @@ class MigrationScreen(ModalWithClickExit):
Label(
"Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, "
"do you want to remove it?\n(The app won't start with it present)",
id="question", classes="button-100"),
Button("Remove and save", variant="primary", id="migrate-remove-save", classes="button-100"),
Button("Don't remove", variant="error", id="migrate-no-change", classes="button-100"),
id="question",
classes="button-100",
),
Button(
"Remove and save",
variant="primary",
id="migrate-remove-save",
classes="button-100",
),
Button(
"Don't remove",
variant="error",
id="migrate-no-change",
classes="button-100",
),
id="dialog-migration",
)
@@ -138,16 +184,25 @@ class MigrationScreen(ModalWithClickExit):
class ExitScreen(ModalWithClickExit):
"""Screen with a dialog to exit."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "save", "Save"),
("q,ctrl+c", "exit", "Exit")]
BINDINGS = [
("escape", "dismiss()", "Cancel"),
("s", "save", "Save"),
("q,ctrl+c", "exit", "Exit"),
]
AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to exit without saving?", id="question", classes="button-100"),
Label(
"Are you sure you want to exit without saving?",
id="question",
classes="button-100",
),
Button("Save", variant="success", id="exit-save", classes="button-100"),
Button("Don't save", variant="error", id="exit-no-save", classes="button-100"),
Button(
"Don't save", variant="error", id="exit-no-save", classes="button-100"
),
Button("Cancel", variant="primary", id="exit-cancel", classes="button-100"),
id="dialog-exit",
)
@@ -171,6 +226,7 @@ class ExitScreen(ModalWithClickExit):
class AddDevice(ModalWithClickExit):
"""Screen with a dialog to add a device, either with a pairing code or with lan discovery."""
BINDINGS = [("escape", "dismiss({})", "Return")]
def __init__(self, config, **kwargs) -> None:
@@ -184,28 +240,58 @@ class AddDevice(ModalWithClickExit):
with Container(id="add-device-container"):
yield Label("Add Device", classes="title")
with Grid(id="add-device-switch-buttons"):
yield Button("Add with pairing code", id="add-device-pin-button", classes="button-switcher")
yield Button("Add with lan discovery", id="add-device-dial-button", classes="button-switcher")
with ContentSwitcher(id="add-device-switcher", initial="add-device-pin-container"):
yield Button(
"Add with pairing code",
id="add-device-pin-button",
classes="button-switcher",
)
yield Button(
"Add with lan discovery",
id="add-device-dial-button",
classes="button-switcher",
)
with ContentSwitcher(
id="add-device-switcher", initial="add-device-pin-container"
):
with Container(id="add-device-pin-container"):
yield Input(placeholder="Pairing Code (found in Settings - Link with TV code)",
id="pairing-code-input",
validators=[
Function(_validate_pairing_code, "Invalid pairing code format")
]
)
yield Input(placeholder="Device Name (auto filled if empty/optional)", id="device-name-input")
yield Button("Add", id="add-device-pin-add-button", variant="success", disabled=True)
yield Input(
placeholder="Pairing Code (found in Settings - Link with TV code)",
id="pairing-code-input",
validators=[
Function(
_validate_pairing_code, "Invalid pairing code format"
)
],
)
yield Input(
placeholder="Device Name (auto filled if empty/optional)",
id="device-name-input",
)
yield Button(
"Add",
id="add-device-pin-add-button",
variant="success",
disabled=True,
)
yield Label(id="add-device-info")
with Container(id="add-device-dial-container"):
yield Label(
"Make sure your device is on the same network as this computer\nIf it isn't showing up, "
"try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh "
"the list, close and open the dialog again",
classes="subtitle")
yield SelectionList(("Searching for devices...", "", False), id="dial-devices-list", disabled=True)
yield Button("Add selected devices", id="add-device-dial-add-button", variant="success",
disabled=True)
classes="subtitle",
)
yield SelectionList(
("Searching for devices...", "", False),
id="dial-devices-list",
disabled=True,
)
yield Button(
"Add selected devices",
id="add-device-dial-add-button",
variant="success",
disabled=True,
)
async def on_mount(self) -> None:
self.devices_discovered_dial = []
@@ -228,11 +314,15 @@ class AddDevice(ModalWithClickExit):
@on(Button.Pressed, "#add-device-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None:
self.query_one("#add-device-switcher").current = event.button.id.replace("-button", "-container")
self.query_one("#add-device-switcher").current = event.button.id.replace(
"-button", "-container"
)
@on(Input.Changed, "#pairing-code-input")
def changed_pairing_code(self, event: Input.Changed):
self.query_one("#add-device-pin-add-button").disabled = not event.validation_result.is_valid
self.query_one(
"#add-device-pin-add-button"
).disabled = not event.validation_result.is_valid
@on(Input.Submitted, "#pairing-code-input")
@on(Button.Pressed, "#add-device-pin-add-button")
@@ -240,7 +330,9 @@ class AddDevice(ModalWithClickExit):
self.query_one("#add-device-pin-add-button").disabled = True
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
pairing_code = self.query_one("#pairing-code-input").value
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces
pairing_code = int(
pairing_code.replace("-", "").replace(" ", "")
) # remove dashes and spaces
device_name = self.parent.query_one("#device-name-input").value
paired = False
try:
@@ -255,7 +347,9 @@ class AddDevice(ModalWithClickExit):
}
self.query_one("#pairing-code-input").value = ""
self.query_one("#device-name-input").value = ""
self.query_one("#add-device-info").update(f"[#00ff00][b]Successfully added {device['name']}")
self.query_one("#add-device-info").update(
f"[#00ff00][b]Successfully added {device['name']}"
)
self.dismiss([device])
else:
self.query_one("#pairing-code-input").value = ""
@@ -273,11 +367,14 @@ class AddDevice(ModalWithClickExit):
@on(SelectionList.SelectedChanged, "#dial-devices-list")
def changed_device_list(self, event: SelectionList.SelectedChanged):
self.query_one("#add-device-dial-add-button").disabled = not event.selection_list.selected
self.query_one(
"#add-device-dial-add-button"
).disabled = not event.selection_list.selected
class AddChannel(ModalWithClickExit):
"""Screen with a dialog to add a channel, either using search or with a channel id."""
BINDINGS = [("escape", "dismiss(())", "Return")]
def __init__(self, config, **kwargs) -> None:
@@ -291,32 +388,67 @@ class AddChannel(ModalWithClickExit):
yield Label("Add Channel", classes="title")
yield Label(
"Select a method to add a channel. Adding via search only works if a YouTube api key has been set",
id="add-channel-label", classes="subtitle")
id="add-channel-label",
classes="subtitle",
)
with Grid(id="add-channel-switch-buttons"):
yield Button("Add by channel name", id="add-channel-search-button", classes="button-switcher")
yield Button("Add by channel ID", id="add-channel-id-button", classes="button-switcher")
yield Button(
"Add by channel name",
id="add-channel-search-button",
classes="button-switcher",
)
yield Button(
"Add by channel ID",
id="add-channel-id-button",
classes="button-switcher",
)
yield Label(id="add-channel-info", classes="subtitle")
with ContentSwitcher(id="add-channel-switcher", initial="add-channel-search-container"):
with ContentSwitcher(
id="add-channel-switcher", initial="add-channel-search-container"
):
with Vertical(id="add-channel-search-container"):
if self.config.apikey:
with Grid(id="add-channel-search-inputs"):
yield Input(placeholder="Enter channel name", id="channel-name-input-search")
yield Button("Search", id="search-channel-button", variant="success")
yield RadioSet(RadioButton(label="Search to see results", disabled=True),
id="channel-search-results")
yield Button("Add", id="add-channel-button-search", variant="success", disabled=True,
classes="button-100")
yield Input(
placeholder="Enter channel name",
id="channel-name-input-search",
)
yield Button(
"Search", id="search-channel-button", variant="success"
)
yield RadioSet(
RadioButton(label="Search to see results", disabled=True),
id="channel-search-results",
)
yield Button(
"Add",
id="add-channel-button-search",
variant="success",
disabled=True,
classes="button-100",
)
else:
yield Label(
"[#ff0000]No api key set, cannot search for channels. You can add it the config section "
"below",
id="add-channel-search-no-key", classes="subtitle")
id="add-channel-search-no-key",
classes="subtitle",
)
with Vertical(id="add-channel-id-container"):
yield Input(placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)",
id="channel-id-input")
yield Input(placeholder="Enter channel name (only used to display in the config file)",
id="channel-name-input-id")
yield Button("Add", id="add-channel-button-id", variant="success", classes="button-100")
yield Input(
placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)",
id="channel-id-input",
)
yield Input(
placeholder="Enter channel name (only used to display in the config file)",
id="channel-name-input-id",
)
yield Button(
"Add",
id="add-channel-button-id",
variant="success",
classes="button-100",
)
@on(RadioSet.Changed, "#channel-search-results")
def handle_radio_set_changed(self, event: RadioSet.Changed) -> None:
@@ -325,14 +457,18 @@ class AddChannel(ModalWithClickExit):
@on(Button.Pressed, "#add-channel-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id
self.query_one("#add-channel-switcher").current = event.button.id.replace("-button", "-container")
self.query_one("#add-channel-switcher").current = event.button.id.replace(
"-button", "-container"
)
@on(Button.Pressed, "#search-channel-button")
@on(Input.Submitted, "#channel-name-input-search")
async def handle_search_channel(self) -> None:
channel_name = self.query_one("#channel-name-input-search").value
if not channel_name:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel name")
self.query_one("#add-channel-info").update(
"[#ff0000]Please enter a channel name"
)
return
self.query_one("#search-channel-button").disabled = True
self.query_one("#add-channel-info").update("Searching...")
@@ -341,7 +477,9 @@ class AddChannel(ModalWithClickExit):
try:
channels_list = await self.api_helper.search_channels(channel_name)
except:
self.query_one("#add-channel-info").update("[#ff0000]Failed to search for channel")
self.query_one("#add-channel-info").update(
"[#ff0000]Failed to search for channel"
)
self.query_one("#search-channel-button").disabled = False
return
for i in channels_list:
@@ -354,7 +492,9 @@ class AddChannel(ModalWithClickExit):
def handle_add_channel_search(self) -> None:
channel = self.query_one("#channel-search-results").pressed_button.channel_data
if not channel:
self.query_one("#add-channel-info").update("[#ff0000]Please select a channel")
self.query_one("#add-channel-info").update(
"[#ff0000]Please select a channel"
)
return
self.query_one("#add-channel-info").update("Adding...")
self.dismiss(channel)
@@ -366,7 +506,9 @@ class AddChannel(ModalWithClickExit):
channel_id = self.query_one("#channel-id-input").value
channel_name = self.query_one("#channel-name-input-id").value
if not channel_id:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel ID")
self.query_one("#add-channel-info").update(
"[#ff0000]Please enter a channel ID"
)
return
if not channel_name:
channel_name = channel_id
@@ -377,6 +519,7 @@ class AddChannel(ModalWithClickExit):
class EditDevice(ModalWithClickExit):
"""Screen with a dialog to edit a device. Used by the DevicesManager."""
BINDINGS = [("escape", "close_screen_saving", "Return")]
def __init__(self, device: Element, **kwargs) -> None:
@@ -402,13 +545,24 @@ class EditDevice(ModalWithClickExit):
yield Input(placeholder="Device name", id="device-name-input", value=name)
yield Label("Device screen id")
with Grid(id="device-id-container"):
yield Input(placeholder="Device id", id="device-id-input", value=self.device_data["screen_id"],
password=True)
yield Input(
placeholder="Device id",
id="device-id-input",
value=self.device_data["screen_id"],
password=True,
)
yield Button("Show id", id="device-id-view")
yield Label("Device offset (in milliseconds)")
with Horizontal(id="device-offset-container"):
yield Input(id="device-offset-input", value=str(offset))
yield Slider(name="Device offset", id="device-offset-slider", min=0, max=2000, step=100, value=offset)
yield Slider(
name="Device offset",
id="device-offset-slider",
min=0,
max=2000,
step=100,
value=offset,
)
def on_slider_changed(self, event: Slider.Changed) -> None:
offset_input = self.query_one("#device-offset-offset_input")
@@ -491,9 +645,15 @@ class ApiKeyManager(Vertical):
yield Label(
"You can get a YouTube Data API v3 Key from the ["
"link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]. This key is "
"only required if you're whitelisting channels.")
"only required if you're whitelisting channels."
)
with Grid(id="api-key-grid"):
yield Input(placeholder="YouTube Api Key", id="api-key-input", password=True, value=self.config.apikey)
yield Input(
placeholder="YouTube Api Key",
id="api-key-input",
password=True,
value=self.config.apikey,
)
yield Button("Show key", id="api-key-view")
@on(Input.Changed, "#api-key-input")
@@ -549,9 +709,14 @@ class SkipCountTrackingManager(Vertical):
"helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. "
"The program sends a message to the sponsor block server each time you skip a segment. Hopefully most "
"people don't change this setting so that the view numbers are accurate. :)",
classes="subtitle", id="skip-count-tracking-subtitle")
yield Checkbox(value=self.config.skip_count_tracking, id="skip-count-tracking-switch",
label="Enable skip count tracking")
classes="subtitle",
id="skip-count-tracking-subtitle",
)
yield Checkbox(
value=self.config.skip_count_tracking,
id="skip-count-tracking-switch",
label="Enable skip count tracking",
)
@on(Checkbox.Changed, "#skip-count-tracking-switch")
def changed_skip_tracking(self, event: Checkbox.Changed):
@@ -570,12 +735,20 @@ class AdSkipMuteManager(Vertical):
yield Label(
"This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if "
"that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.",
classes="subtitle", id="skip-count-tracking-subtitle")
classes="subtitle",
id="skip-count-tracking-subtitle",
)
with Horizontal(id="ad-skip-mute-container"):
yield Checkbox(value=self.config.skip_ads, id="skip-ads-switch",
label="Enable skipping ads")
yield Checkbox(value=self.config.mute_ads, id="mute-ads-switch",
label="Enable muting ads")
yield Checkbox(
value=self.config.skip_ads,
id="skip-ads-switch",
label="Enable skipping ads",
)
yield Checkbox(
value=self.config.mute_ads,
id="mute-ads-switch",
label="Enable muting ads",
)
@on(Checkbox.Changed, "#mute-ads-switch")
def changed_mute(self, event: Checkbox.Changed):
@@ -598,16 +771,22 @@ class ChannelWhitelistManager(Vertical):
yield Label(
"This feature allows to whitelist channels from being skipped. This feature is automatically disabled "
"when no channels have been specified.",
classes="subtitle", id="channel-whitelist-subtitle")
yield Label(":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature",
id="warning-no-key")
classes="subtitle",
id="channel-whitelist-subtitle",
)
yield Label(
":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature",
id="warning-no-key",
)
with Horizontal(id="add-channel-button-container"):
yield Button("Add Channel", id="add-channel", classes="button-100")
for channel in self.config.channel_whitelist:
yield Channel(channel)
def on_mount(self) -> None:
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(self.config.channel_whitelist)
self.app.query_one("#warning-no-key").display = (
not self.config.apikey
) and bool(self.config.channel_whitelist)
def new_channel(self, channel: tuple) -> None:
if channel:
@@ -619,16 +798,18 @@ class ChannelWhitelistManager(Vertical):
channel_widget = Channel(channel_dict)
self.mount(channel_widget)
channel_widget.focus(scroll_visible=True)
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(
self.config.channel_whitelist)
self.app.query_one("#warning-no-key").display = (
not self.config.apikey
) and bool(self.config.channel_whitelist)
@on(Button.Pressed, "#element-remove")
def remove_channel(self, event: Button.Pressed):
channel_to_remove: Element = event.button.parent
self.config.channel_whitelist.remove(channel_to_remove.element_data)
channel_to_remove.remove()
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(
self.config.channel_whitelist)
self.app.query_one("#warning-no-key").display = (
not self.config.apikey
) and bool(self.config.channel_whitelist)
@on(Button.Pressed, "#add-channel")
def add_channel(self, event: Button.Pressed):
@@ -637,12 +818,10 @@ class ChannelWhitelistManager(Vertical):
class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard"
BINDINGS = [
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
AUTO_FOCUS = None
def __init__(self, config, **kwargs) -> None:
@@ -655,12 +834,24 @@ class ISponsorBlockTVSetupMainScreen(Screen):
yield Header()
yield Footer()
with ScrollableContainer(id="setup-wizard"):
yield DevicesManager(config=self.config, id="devices-manager", classes="container")
yield SkipCategoriesManager(config=self.config, id="skip-categories-manager", classes="container")
yield SkipCountTrackingManager(config=self.config, id="count-segments-manager", classes="container")
yield AdSkipMuteManager(config=self.config, id="ad-skip-mute-manager", classes="container")
yield ChannelWhitelistManager(config=self.config, id="channel-whitelist-manager", classes="container")
yield ApiKeyManager(config=self.config, id="api-key-manager", classes="container")
yield DevicesManager(
config=self.config, id="devices-manager", classes="container"
)
yield SkipCategoriesManager(
config=self.config, id="skip-categories-manager", classes="container"
)
yield SkipCountTrackingManager(
config=self.config, id="count-segments-manager", classes="container"
)
yield AdSkipMuteManager(
config=self.config, id="ad-skip-mute-manager", classes="container"
)
yield ChannelWhitelistManager(
config=self.config, id="channel-whitelist-manager", classes="container"
)
yield ApiKeyManager(
config=self.config, id="api-key-manager", classes="container"
)
def on_mount(self) -> None:
if self.check_for_old_config_entries():
@@ -686,7 +877,9 @@ class ISponsorBlockTVSetupMainScreen(Screen):
def changed_api_key(self, event: Input.Changed):
try: # ChannelWhitelist might not be mounted
# Show if no api key is set and at least one channel is in the whitelist
self.app.query_one("#warning-no-key").display = (not event.input.value) and self.config.channel_whitelist
self.app.query_one("#warning-no-key").display = (
not event.input.value
) and self.config.channel_whitelist
except:
pass
@@ -694,10 +887,7 @@ class ISponsorBlockTVSetupMainScreen(Screen):
class ISponsorBlockTVSetup(App):
CSS_PATH = "setup-wizard-style.tcss" # tcss is the recommended extension for textual css files
# Bindings for the whole app here, so they are available in all screens
BINDINGS = [
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import pyytlounge
from .constants import youtube_client_blacklist
@@ -25,7 +26,9 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
# Ensures that we still are subscribed to the lounge
async def _watchdog(self):
await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other)
await asyncio.sleep(
35
) # YouTube sends at least a message every 30 seconds (no-op or any other)
try:
self.subscribe_task.cancel()
except Exception:
@@ -68,14 +71,18 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
create_task(self.mute(False, override=True))
elif event_type == "onAdStateChange":
data = args[0]
if data["adState"] == '0': # Ad is not playing
if data["adState"] == "0": # Ad is not playing
# print("Ad has ended, unmuting")
create_task(self.mute(False, override=True))
elif self.skip_ads and data["isSkipEnabled"] == "true": # YouTube uses strings for booleans
elif (
self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans
self._logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad())
create_task(self.mute(False, override=True))
elif self.mute_ads: # Seen multiple other adStates, assuming they are all ads
elif (
self.mute_ads
): # Seen multiple other adStates, assuming they are all ads
self._logger.info("Ad has started, muting")
create_task(self.mute(True, override=True))
# Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it)
@@ -84,7 +91,9 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
pass
# Gets segments for the next video before it starts playing
elif event_type == "autoplayUpNext":
if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty
if len(args) > 0 and (
vid_id := args[0]["videoId"]
): # if video id is not empty
print(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id))
@@ -95,11 +104,15 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if vid_id := data["contentVideoId"]:
self._logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id))
elif self.skip_ads and data["isSkipEnabled"] == "true": # YouTube uses strings for booleans
elif (
self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans
self._logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad())
create_task(self.mute(False, override=True))
elif self.mute_ads: # Seen multiple other adStates, assuming they are all ads
elif (
self.mute_ads
): # Seen multiple other adStates, assuming they are all ads
self._logger.info("Ad has started, muting")
create_task(self.mute(True, override=True))
@@ -142,10 +155,15 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if override or not (self.volume_state.get("muted", "false") == mute_str):
self.volume_state["muted"] = mute_str
# YouTube wants the volume when unmuting, so we send it
await super()._command("setVolume", {"volume": self.volume_state.get("volume", 100), "muted": mute_str})
await super()._command(
"setVolume",
{"volume": self.volume_state.get("volume", 100), "muted": mute_str},
)
async def set_auto_play_mode(self, enabled: bool):
await super()._command("setAutoplayMode", {"autoplayMode": "ENABLED" if enabled else "DISABLED"})
await super()._command(
"setAutoplayMode", {"autoplayMode": "ENABLED" if enabled else "DISABLED"}
)
async def play_video(self, video_id: str) -> bool:
return await self._command("setPlaylist", {"videoId": video_id})