Clean code and fix #121

This commit is contained in:
dmunozv04
2023-12-29 16:19:44 +01:00
parent 35652b6247
commit c3fd67df27
12 changed files with 151 additions and 98 deletions

View File

@@ -72,4 +72,5 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache
cache-to: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache,mode=max
# Only cache if it's not a pull request
cache-to: ${{ github.event_name != 'pull_request' && 'type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache,mode=max' || '' }}

View File

@@ -1,5 +1,6 @@
from . import helpers
def main():
helpers.app_start()

View File

@@ -6,11 +6,11 @@ from aiohttp import ClientSession
import html
def listToTuple(function):
def list_to_tuple(function):
def wrapper(*args):
args = [tuple(x) if type(x) == list else x for x in args]
args = [tuple(x) if x is list else x for x in args]
result = function(*args)
result = tuple(result) if type(result) == list else result
result = tuple(result) if result is list else result
return result
return wrapper
@@ -80,26 +80,26 @@ class ApiHelper:
return channels
for i in data["items"]:
# Get channel subcription number
# Get channel subscription number
params = {"id": i["snippet"]["channelId"], "key": self.apikey, "part": "statistics"}
url = constants.Youtube_api + "channels"
async with self.web_session.get(url, params=params) as resp:
channelData = await resp.json()
channel_data = await resp.json()
if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]:
subCount = "Hidden"
if channel_data["items"][0]["statistics"]["hiddenSubscriberCount"]:
sub_count = "Hidden"
else:
subCount = int(channelData["items"][0]["statistics"]["subscriberCount"])
subCount = format(subCount, "_")
sub_count = int(channel_data["items"][0]["statistics"]["subscriberCount"])
sub_count = format(sub_count, "_")
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount))
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], sub_count))
return channels
@listToTuple # Convert list to tuple so it can be used as a key in the cache
@list_to_tuple # Convert list to tuple so it can be used as a key in the cache
@AsyncConditionalTTL(time_to_live=300, maxsize=10) # 5 minutes for non-locked segments
async def get_segments(self, vid_id):
if await self.is_whitelisted(vid_id):
return ([], True) # Return empty list and True to indicate that the cache should last forever
return [], True # Return empty list and True to indicate that the cache should last forever
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4
] # Hashes video id and gets the first 4 characters
@@ -117,7 +117,7 @@ class ApiHelper:
print(
f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. "
f"Code: {response.status} - {response_text}")
return ([], True)
return [], True
for i in response_json:
if str(i["videoID"]) == str(vid_id):
response_json = i
@@ -144,20 +144,20 @@ class ApiHelper:
segment_before_end = -10
if (
segment_dict["start"] - segment_before_end < 1
): # Less than 1 second appart, combine them and skip them together
): # Less than 1 second apart, combine them and skip them together
segment_dict["start"] = segment_before_start
segment_dict["UUID"].extend(segment_before_UUID)
segments.pop()
segments.append(segment_dict)
except Exception:
pass
return (segments, ignore_ttl)
return segments, ignore_ttl
async def mark_viewed_segments(self, UUID):
async def mark_viewed_segments(self, uuids):
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled.
Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking:
for i in UUID:
for i in uuids:
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": i}
await self.web_session.post(url, params=params)

View File

@@ -1,3 +1,7 @@
from cache.key import KEY
from cache.lru import LRU
import datetime
"""MIT License
Copyright (c) 2020 Rajat Singh
@@ -21,10 +25,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE."""
'''Modified code from https://github.com/iamsinghrajat/async-cache'''
from cache.key import KEY
from cache.lru import LRU
import datetime
class AsyncConditionalTTL:
class _TTL(LRU):

View File

@@ -49,14 +49,12 @@ def main(config, debug: bool) -> None:
if apikey:
if input("API key already specified. Change it? (y/n) ") == "y":
apikey = input("Enter your API key: ")
config["apikey"] = apikey
else:
if input("API key only needed for the channel whitelist function. Add it? (y/n) ") == "y":
print(
"Get youtube apikey here: https://developers.google.com/youtube/registering_an_application"
)
apikey = input("Enter your API key: ")
config["apikey"] = apikey
config.apikey = apikey
skip_categories = config.skip_categories

View File

@@ -46,12 +46,12 @@ def get_ip():
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
ip = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
ip = '127.0.0.1'
finally:
s.close()
return IP
return ip
class Handler(ssdp.aio.SSDP):

View File

@@ -36,7 +36,7 @@ class Config:
self.devices = []
self.apikey = ""
self.skip_categories = []
self.skip_categories = [] # These are the categories on the config file
self.channel_whitelist = []
self.skip_count_tracking = True
self.mute_ads = False
@@ -61,7 +61,7 @@ class Config:
if not self.apikey and self.channel_whitelist:
raise ValueError("No youtube API key found and channel whitelist is not empty")
if not self.skip_categories:
self.categories = ["sponsor"]
self.skip_categories = ["sponsor"]
print("No categories found, using default: sponsor")
def __load(self):

View File

@@ -1,9 +1,33 @@
import logging
from rich.logging import RichHandler
from rich._log_render import LogRender
from rich.text import Text
from rich.style import Style
'''
Copyright (c) 2020 Will McGugan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Modified code from rich (https://github.com/textualize/rich)
'''
class LogHandler(RichHandler):
def __init__(self, device_name, log_name_len, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -31,12 +55,25 @@ class LogHandler(RichHandler):
return self._filter(original)
class LogRender(LogRender):
def __init__(self, device_name, log_name_len, *args, **kwargs):
super().__init__(*args, **kwargs)
class LogRender:
def __init__(self, device_name,
log_name_len,
show_time=True,
show_level=False,
show_path=True,
time_format="[%x %X]",
omit_repeated_times=True,
level_width=8):
self.filter_strings = []
self.log_name_len = log_name_len
self.device_name = device_name
self.show_time = show_time
self.show_level = show_level
self.show_path = show_path
self.time_format = time_format
self.omit_repeated_times = omit_repeated_times
self.level_width = level_width
self._last_time = None
def __call__(
self,
@@ -100,6 +137,7 @@ class LogFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, style="%", validate=True, filter_strings=None):
super().__init__(fmt, datefmt, style, validate)
self.filter_strings = filter_strings or []
def add_filter_string(self, s):
self.filter_strings.append(s)
@@ -108,6 +146,7 @@ class LogFormatter(logging.Formatter):
for i in self.filter_strings:
s = s.replace(i, "REDACTED")
return s
def format(self, record):
original = logging.Formatter.format(self, record)
return self._filter(original)

View File

@@ -40,7 +40,8 @@ def main():
create_plist(correct_path)
run_setup(correct_path + "/config.json")
print(
"Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
'Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load '
'~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist'
)
else:
if not os.path.exists(correct_path):
@@ -48,6 +49,6 @@ def main():
print(
"Please move the program to the correct path: "
+ correct_path
+ "opeing now on finder..."
+ "opening now on finder..."
)
os.system("open -R " + correct_path)

View File

@@ -3,6 +3,7 @@ import aiohttp
import time
import logging
import rich
from typing import Optional
from signal import signal, SIGINT, SIGTERM
from . import api_helpers, ytlounge, logging_helpers
@@ -10,7 +11,7 @@ from . import api_helpers, ytlounge, logging_helpers
class DeviceListener:
def __init__(self, api_helper, config, device, log_name_len, debug: bool):
self.task: asyncio.Task = None
self.task: Optional[asyncio.Task] = None
self.api_helper = api_helper
self.offset = device.offset
self.name = device.name
@@ -52,7 +53,6 @@ class DeviceListener:
self.logger.debug("Refreshing auth")
await lounge_controller.refresh_auth()
except:
# traceback.print_exc()
await asyncio.sleep(10)
while not self.cancelled:
while not (await self.is_available()) and not self.cancelled:
@@ -68,7 +68,7 @@ class DeviceListener:
await lounge_controller.connect()
except:
pass
self.logger.info(f"Connected to device {lounge_controller.screen_name} ({self.name})")
self.logger.info("Connected to device %s (%s)", lounge_controller.screen_name, self.name)
try:
# print("Subscribing to lounge")
sub = await lounge_controller.subscribe_monitored(self)
@@ -115,11 +115,11 @@ class DeviceListener:
await self.skip(time_to_next, next_segment["end"], next_segment["UUID"])
# Skips to the next segment (waits for the time to pass)
async def skip(self, time_to, position, UUID):
async def skip(self, time_to, position, uuids):
await asyncio.sleep(time_to)
self.logger.info(f"Skipping segment: seeking to {position}")
asyncio.create_task(self.api_helper.mark_viewed_segments(UUID))
asyncio.create_task(self.lounge_controller.seek_to(position))
self.logger.info("Skipping segment: seeking to %s", position)
await asyncio.create_task(self.api_helper.mark_viewed_segments(uuids))
await asyncio.create_task(self.lounge_controller.seek_to(position))
# Stops the connection to the device
async def cancel(self):

View File

@@ -80,7 +80,9 @@ class Device(Element):
if "name" in self.element_data and self.element_data["name"]:
self.element_name = self.element_data["name"]
else:
self.element_name = f"Unnamed device with id {self.element_data['screen_id'][:5]}...{self.element_data['screen_id'][-5:]}"
self.element_name = (f"Unnamed device with id "
f"{self.element_data['screen_id'][:5]}..."
f"{self.element_data['screen_id'][-5:]}")
class Channel(Element):
@@ -112,7 +114,8 @@ class MigrationScreen(ModalWithClickExit):
def compose(self) -> ComposeResult:
yield Grid(
Label(
"Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, do you want to remove it?\n(The app won't start with it present)",
"Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, "
"do you want to remove it?\n(The app won't start with it present)",
id="question", classes="button-100"),
Button("Remove and save", variant="primary", id="migrate-remove-save", classes="button-100"),
Button("Don't remove", variant="error", id="migrate-no-change", classes="button-100"),
@@ -196,7 +199,9 @@ class AddDevice(ModalWithClickExit):
yield Label(id="add-device-info")
with Container(id="add-device-dial-container"):
yield Label(
"Make sure your device is on the same network as this computer\nIf it isn't showing up, try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh the list, close and open the dialog again",
"Make sure your device is on the same network as this computer\nIf it isn't showing up, "
"try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh "
"the list, close and open the dialog again",
classes="subtitle")
yield SelectionList(("Searching for devices...", "", False), id="dial-devices-list", disabled=True)
yield Button("Add selected devices", id="add-device-dial-add-button", variant="success",
@@ -223,7 +228,6 @@ class AddDevice(ModalWithClickExit):
@on(Button.Pressed, "#add-device-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id
self.query_one("#add-device-switcher").current = event.button.id.replace("-button", "-container")
@on(Input.Changed, "#pairing-code-input")
@@ -304,7 +308,8 @@ class AddChannel(ModalWithClickExit):
classes="button-100")
else:
yield Label(
"[#ff0000]No api key set, cannot search for channels. You can add it the config section below",
"[#ff0000]No api key set, cannot search for channels. You can add it the config section "
"below",
id="add-channel-search-no-key", classes="subtitle")
with Vertical(id="add-channel-id-container"):
yield Input(placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)",
@@ -406,9 +411,9 @@ class EditDevice(ModalWithClickExit):
yield Slider(name="Device offset", id="device-offset-slider", min=0, max=2000, step=100, value=offset)
def on_slider_changed(self, event: Slider.Changed) -> None:
input = self.query_one("#device-offset-input")
with input.prevent(Input.Changed):
input.value = str(event.slider.value)
offset_input = self.query_one("#device-offset-offset_input")
with offset_input.prevent(Input.Changed):
offset_input.value = str(event.slider.value)
def on_input_changed(self, event: Input.Changed):
if event.input.id == "device-offset-input":
@@ -430,7 +435,8 @@ class EditDevice(ModalWithClickExit):
class DevicesManager(Vertical):
"""Manager for devices, allows to add, edit and remove devices."""
"""Manager for devices, allows adding, edit and removing devices."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -452,7 +458,8 @@ class DevicesManager(Vertical):
self.mount(device_widget)
device_widget.focus(scroll_visible=True)
def edit_device(self, device_widget: Element) -> None:
@staticmethod
def edit_device(device_widget: Element) -> None:
device_widget.process_values_from_data()
device_widget.query_one("#element-name").label = device_widget.element_name
@@ -474,6 +481,7 @@ class DevicesManager(Vertical):
class ApiKeyManager(Vertical):
"""Manager for the YouTube Api Key."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -481,7 +489,9 @@ class ApiKeyManager(Vertical):
def compose(self) -> ComposeResult:
yield Label("YouTube Api Key", classes="title")
yield Label(
"You can get a YouTube Data API v3 Key from the [link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]. This key is only required if you're whitelisting channels.")
"You can get a YouTube Data API v3 Key from the ["
"link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]. This key is "
"only required if you're whitelisting channels.")
with Grid(id="api-key-grid"):
yield Input(placeholder="YouTube Api Key", id="api-key-input", password=True, value=self.config.apikey)
yield Button("Show key", id="api-key-view")
@@ -489,10 +499,6 @@ class ApiKeyManager(Vertical):
@on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed):
self.config.apikey = event.input.value
# try: # ChannelWhitelist might not be mounted
# self.app.query_one("#warning-no-key").display = not self.config.apikey
# except:
# pass
@on(Button.Pressed, "#api-key-view")
def pressed_api_key_view(self, event: Button.Pressed):
@@ -505,7 +511,8 @@ class ApiKeyManager(Vertical):
class SkipCategoriesManager(Vertical):
"""Manager for skip categories, allows to select which categories to skip."""
"""Manager for skip categories, allows selecting which categories to skip."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -530,6 +537,7 @@ class SkipCategoriesManager(Vertical):
class SkipCountTrackingManager(Vertical):
"""Manager for skip count tracking, allows to enable/disable skip count tracking."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -537,7 +545,10 @@ class SkipCountTrackingManager(Vertical):
def compose(self) -> ComposeResult:
yield Label("Skip count tracking", classes="title")
yield Label(
"This feature tracks which segments you have skipped to let users know how much their submission has helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. The program sends a message to the sponsor block server each time you skip a segment. Hopefully most people don't change this setting so that the view numbers are accurate. :)",
"This feature tracks which segments you have skipped to let users know how much their submission has "
"helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. "
"The program sends a message to the sponsor block server each time you skip a segment. Hopefully most "
"people don't change this setting so that the view numbers are accurate. :)",
classes="subtitle", id="skip-count-tracking-subtitle")
yield Checkbox(value=self.config.skip_count_tracking, id="skip-count-tracking-switch",
label="Enable skip count tracking")
@@ -549,6 +560,7 @@ class SkipCountTrackingManager(Vertical):
class AdSkipMuteManager(Vertical):
"""Manager for ad skip/mute, allows to enable/disable ad skip/mute."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -556,7 +568,8 @@ class AdSkipMuteManager(Vertical):
def compose(self) -> ComposeResult:
yield Label("Skip/Mute ads", classes="title")
yield Label(
"This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.",
"This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if "
"that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.",
classes="subtitle", id="skip-count-tracking-subtitle")
with Horizontal(id="ad-skip-mute-container"):
yield Checkbox(value=self.config.skip_ads, id="skip-ads-switch",
@@ -574,7 +587,8 @@ class AdSkipMuteManager(Vertical):
class ChannelWhitelistManager(Vertical):
"""Manager for channel whitelist, allows to add/remove channels from the whitelist."""
"""Manager for channel whitelist, allows adding/removing channels from the whitelist."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
@@ -582,7 +596,8 @@ class ChannelWhitelistManager(Vertical):
def compose(self) -> ComposeResult:
yield Label("Channel Whitelist", classes="title")
yield Label(
"This feature allows to whitelist channels from being skipped. This feature is automatically disabled when no channels have been specified.",
"This feature allows to whitelist channels from being skipped. This feature is automatically disabled "
"when no channels have been specified.",
classes="subtitle", id="channel-whitelist-subtitle")
yield Label(":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature",
id="warning-no-key")
@@ -593,6 +608,7 @@ class ChannelWhitelistManager(Vertical):
def on_mount(self) -> None:
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(self.config.channel_whitelist)
def new_channel(self, channel: tuple) -> None:
if channel:
channel_dict = {
@@ -619,7 +635,7 @@ class ChannelWhitelistManager(Vertical):
self.app.push_screen(AddChannel(self.config), callback=self.new_channel)
class iSponsorBlockTVSetupMainScreen(Screen):
class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard"
@@ -668,14 +684,14 @@ class iSponsorBlockTVSetupMainScreen(Screen):
@on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed):
print("HIIII")
try: # ChannelWhitelist might not be mounted
# Show if no api key is set and at least one channel is in the whitelist
self.app.query_one("#warning-no-key").display = (not event.input.value) and self.config.channel_whitelist
except:
pass
class iSponsorBlockTVSetup(App):
class ISponsorBlockTVSetup(App):
CSS_PATH = "setup-wizard-style.tcss" # tcss is the recommended extension for textual css files
# Bindings for the whole app here, so they are available in all screens
BINDINGS = [
@@ -686,7 +702,7 @@ class iSponsorBlockTVSetup(App):
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
self.main_screen = iSponsorBlockTVSetupMainScreen(config=self.config)
self.main_screen = ISponsorBlockTVSetupMainScreen(config=self.config)
def on_mount(self) -> None:
self.push_screen(self.main_screen)
@@ -699,5 +715,5 @@ class iSponsorBlockTVSetup(App):
def main(config):
app = iSponsorBlockTVSetup(config)
app = ISponsorBlockTVSetup(config)
app.run()

View File

@@ -1,7 +1,7 @@
import asyncio
import json
import aiohttp
import pyytlounge
from .constants import youtube_client_blacklist
create_task = asyncio.create_task
@@ -83,11 +83,10 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.volume_state = args[0]
pass
# Gets segments for the next video before it starts playing
# Comment "fix" since it doesn't seem to work
# elif event_type == "autoplayUpNext":
# if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty
# print(f"Getting segments for next video: {vid_id}")
# create_task(self.api_helper.get_segments(vid_id))
elif event_type == "autoplayUpNext":
if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty
print(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id))
# #Used to know if an ad is skippable or not
elif event_type == "adPlaying":
@@ -113,9 +112,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if device_info.get("clientName", "") in youtube_client_blacklist:
self._sid = None
self._gsession = None # Force disconnect
# elif event_type == "onAutoplayModeChanged":
# data = args[0]
# create_task(self.set_auto_play_mode(data["autoplayMode"] == "ENABLED"))
elif event_type == "onSubtitlesTrackChanged":
if self.shorts_disconnected:
data = args[0]