Merge pull request #271 from dmunozv04/clean-code

Clean code
This commit is contained in:
David
2025-03-10 13:07:36 +01:00
committed by GitHub
10 changed files with 110 additions and 154 deletions

View File

@@ -120,7 +120,8 @@ class ApiHelper:
return ( return (
[], [],
True, True,
) # Return empty list and True to indicate that the cache should last forever ) # Return empty list and True to indicate
# that the cache should last forever
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[ vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4 :4
] # Hashes video id and gets the first 4 characters ] # Hashes video id and gets the first 4 characters
@@ -183,7 +184,7 @@ class ApiHelper:
segment_before_start = segments[-1]["start"] segment_before_start = segments[-1]["start"]
segment_before_UUID = segments[-1]["UUID"] segment_before_UUID = segments[-1]["UUID"]
except Exception: except IndexError:
segment_before_end = -10 segment_before_end = -10
if ( if (
segment_dict["start"] - segment_before_end < 1 segment_dict["start"] - segment_before_end < 1
@@ -192,12 +193,13 @@ class ApiHelper:
segment_dict["UUID"].extend(segment_before_UUID) segment_dict["UUID"].extend(segment_before_UUID)
segments.pop() segments.pop()
segments.append(segment_dict) segments.append(segment_dict)
except Exception: except BaseException:
pass pass
return segments, ignore_ttl return segments, ignore_ttl
async def mark_viewed_segments(self, uuids): async def mark_viewed_segments(self, uuids):
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. """Marks the segments as viewed in the SponsorBlock API
if skip_count_tracking is enabled.
Lets the contributor know that someone skipped the segment (thanks)""" Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking: if self.skip_count_tracking:
for i in uuids: for i in uuids:

View File

@@ -3,28 +3,29 @@ import datetime
from cache.key import KEY from cache.key import KEY
from cache.lru import LRU from cache.lru import LRU
"""MIT License # MIT License
Copyright (c) 2020 Rajat Singh # Copyright (c) 2020 Rajat Singh
Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights # in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is # copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: # furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all # The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. # copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.""" # SOFTWARE.
"""Modified code from https://github.com/iamsinghrajat/async-cache"""
# Modified code from https://github.com/iamsinghrajat/async-cache
class AsyncConditionalTTL: class AsyncConditionalTTL:

View File

@@ -43,6 +43,7 @@ def get_yn_input(prompt):
if choice.lower() in ["y", "n"]: if choice.lower() in ["y", "n"]:
return choice.lower() return choice.lower()
print("Invalid input. Please enter 'y' or 'n'.") print("Invalid input. Please enter 'y' or 'n'.")
return None
async def create_web_session(): async def create_web_session():

View File

@@ -22,3 +22,5 @@ youtube_client_blacklist = ["TVHTML5_FOR_KIDS"]
config_file_blacklist_keys = ["config_file", "data_dir"] config_file_blacklist_keys = ["config_file", "data_dir"]
github_wiki_base_url = "https://github.com/dmunozv04/iSponsorBlockTV/wiki"

View File

@@ -7,46 +7,47 @@ import ssdp
import xmltodict import xmltodict
from ssdp import network from ssdp import network
"""Redistribution and use of the DIAL DIscovery And Launch protocol # Redistribution and use of the DIAL DIscovery And Launch protocol
specification (the “DIAL Specification”), with or without modification, # specification (the “DIAL Specification”), with or without modification,
are permitted provided that the following conditions are met: ● # are permitted provided that the following conditions are met: ●
Redistributions of the DIAL Specification must retain the above copyright # Redistributions of the DIAL Specification must retain the above copyright
notice, this list of conditions and the following disclaimer. ● # notice, this list of conditions and the following disclaimer. ●
Redistributions of implementations of the DIAL Specification in source code # Redistributions of implementations of the DIAL Specification in source code
form must retain the above copyright notice, this list of conditions and the # form must retain the above copyright notice, this list of conditions and the
following disclaimer. ● Redistributions of implementations of the DIAL # following disclaimer. ● Redistributions of implementations of the DIAL
Specification in binary form must include the above copyright notice. ● The # Specification in binary form must include the above copyright notice. ● The
DIAL mark, the NETFLIX mark and the names of contributors to the DIAL # DIAL mark, the NETFLIX mark and the names of contributors to the DIAL
Specification may not be used to endorse or promote specifications, software, # Specification may not be used to endorse or promote specifications, software,
products, or any other materials derived from the DIAL Specification without # products, or any other materials derived from the DIAL Specification without
specific prior written permission. The DIAL mark is owned by Netflix and # specific prior written permission. The DIAL mark is owned by Netflix and
information on licensing the DIAL mark is available at # information on licensing the DIAL mark is available at
www.dial-multiscreen.org.""" # www.dial-multiscreen.org.
"""
MIT License
Copyright (c) 2018 Johannes Hoppe # MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy # Copyright (c) 2018 Johannes Hoppe
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all # Permission is hereby granted, free of charge, to any person obtaining a copy
copies or substantial portions of the Software. # of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # The above copyright notice and this permission notice shall be included in all
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # copies or substantial portions of the Software.
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
SOFTWARE.""" # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
"""Modified code from # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py""" # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Modified code from
# https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py
def get_ip(): def get_ip():
@@ -81,6 +82,11 @@ class Handler(ssdp.aio.SSDP):
if "location" in headers: if "location" in headers:
self.devices.append(headers["location"]) self.devices.append(headers["location"])
def request_received(self, request: ssdp.messages.SSDPRequest, addr):
raise NotImplementedError(
"Request received is not implemented, this is a client"
)
async def find_youtube_app(web_session, url_location): async def find_youtube_app(web_session, url_location):
async with web_session.get(url_location) as response: async with web_session.get(url_location) as response:
@@ -111,21 +117,21 @@ async def discover(web_session):
search_target = "urn:dial-multiscreen-org:service:dial:1" search_target = "urn:dial-multiscreen-org:service:dial:1"
max_wait = 10 max_wait = 10
handler = Handler() handler = Handler()
"""Send out an M-SEARCH request and listening for responses.""" # Send out an M-SEARCH request and listening for responses
family, addr = network.get_best_family(bind, network.PORT) family, _ = network.get_best_family(bind, network.PORT)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
ip_address = get_ip() ip_address = get_ip()
connect = loop.create_datagram_endpoint( connect = loop.create_datagram_endpoint(
handler, family=family, local_addr=(ip_address, None) handler, family=family, local_addr=(ip_address, None)
) )
transport, protocol = await connect transport, _ = await connect
target = network.MULTICAST_ADDRESS_IPV4, network.PORT target = network.MULTICAST_ADDRESS_IPV4, network.PORT
search_request = ssdp.messages.SSDPRequest( search_request = ssdp.messages.SSDPRequest(
"M-SEARCH", "M-SEARCH",
headers={ headers={
"HOST": "%s:%d" % target, "HOST": f"{target[0]}:{target[1]}",
"MAN": '"ssdp:discover"', "MAN": '"ssdp:discover"',
"MX": str(max_wait), # seconds to delay response [1..5] "MX": str(max_wait), # seconds to delay response [1..5]
"ST": search_target, "ST": search_target,
@@ -136,7 +142,6 @@ async def discover(web_session):
search_request.sendto(transport, target) search_request.sendto(transport, target)
# print(search_request, addr[:2])
try: try:
await asyncio.sleep(4) await asyncio.sleep(4)
finally: finally:

View File

@@ -8,7 +8,7 @@ import rich_click as click
from appdirs import user_data_dir from appdirs import user_data_dir
from . import config_setup, main, setup_wizard from . import config_setup, main, setup_wizard
from .constants import config_file_blacklist_keys from .constants import config_file_blacklist_keys, github_wiki_base_url
class Device: class Device:
@@ -51,8 +51,8 @@ class Config:
( (
"The atvs config option is deprecated and has stopped working." "The atvs config option is deprecated and has stopped working."
" Please read this for more information " " Please read this for more information "
"on how to upgrade to V2:" "on how to upgrade to V2:\n"
" \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2" f"{github_wiki_base_url}/Migrate-from-V1-to-V2"
), ),
) )
print("Exiting in 10 seconds...") print("Exiting in 10 seconds...")
@@ -90,7 +90,7 @@ class Config:
print( print(
"Running in docker without mounting the data dir, check the" "Running in docker without mounting the data dir, check the"
" wiki for more information: " " wiki for more information: "
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker" f"{github_wiki_base_url}/Installation#Docker"
) )
print( print(
( (
@@ -101,7 +101,7 @@ class Config:
"Please read this for more information on how to upgrade" "Please read this for more information on how to upgrade"
" to V2:" " to V2:"
), ),
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2", f"{github_wiki_base_url}/Migrate-from-V1-to-V2",
) )
print("Exiting in 10 seconds...") print("Exiting in 10 seconds...")
time.sleep(10) time.sleep(10)
@@ -126,6 +126,9 @@ class Config:
return self.__dict__ == other.__dict__ return self.__dict__ == other.__dict__
return False return False
def __hash__(self):
return hash(tuple(sorted(self.items())))
@click.group(invoke_without_command=True) @click.group(invoke_without_command=True)
@click.option( @click.option(
@@ -175,29 +178,23 @@ def cli(ctx, data, debug, setup, setup_cli):
ctx.invoke(start) ctx.invoke(start)
@cli.command() @cli.command(name="setup")
@click.pass_context @click.pass_context
def setup(ctx): def setup_command(ctx):
"""Setup the program graphically""" """Setup the program graphically"""
config = Config(ctx.obj["data_dir"]) config = Config(ctx.obj["data_dir"])
setup_wizard.main(config) setup_wizard.main(config)
sys.exit() sys.exit()
setup_command = setup @cli.command(name="setup-cli")
@cli.command()
@click.pass_context @click.pass_context
def setup_cli(ctx): def setup_cli_command(ctx):
"""Setup the program in the command line""" """Setup the program in the command line"""
config = Config(ctx.obj["data_dir"]) config = Config(ctx.obj["data_dir"])
config_setup.main(config, ctx.obj["debug"]) config_setup.main(config, ctx.obj["debug"])
setup_cli_command = setup_cli
@cli.command() @cli.command()
@click.pass_context @click.pass_context
def start(ctx): def start(ctx):

View File

@@ -1,57 +0,0 @@
import os
import plistlib
from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons"""
default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV",
"RunAtLoad": True,
"StartInterval": 20,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardErrorPath": "", # Fill later
"StandardOutPath": "",
"ProgramArguments": "",
"WorkingDirectory": "",
}
def create_plist(path):
plist = default_plist
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"]
plist["StandardErrorPath"] = path + "/iSponsorBlockTV.error.log"
plist["StandardOutPath"] = path + "/iSponsorBlockTV.out.log"
plist["WorkingDirectory"] = path
launchd_path = os.path.expanduser("~/Library/LaunchAgents/")
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist"
with open(path_to_save, "wb") as fp:
plistlib.dump(plist, fp)
def run_setup(file):
config = {}
config_setup.main(config, file, debug=False)
def main():
correct_path = os.path.expanduser("~/iSponsorBlockTV")
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"):
print("Program is on the right path")
print("The launch daemon will now be installed")
create_plist(correct_path)
run_setup(correct_path + "/config.json")
print(
"Launch daemon installed. Please restart the computer to enable it or"
" use:\n launchctl load"
" ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
)
else:
if not os.path.exists(correct_path):
os.makedirs(correct_path)
print(
"Please move the program to the correct path: "
+ correct_path
+ "opening now on finder..."
)
os.system("open -R " + correct_path)

View File

@@ -29,14 +29,12 @@ class DeviceListener:
try: try:
await self.lounge_controller.refresh_auth() await self.lounge_controller.refresh_auth()
except BaseException: except BaseException:
# traceback.print_exc()
pass pass
async def is_available(self): async def is_available(self):
try: try:
return await self.lounge_controller.is_available() return await self.lounge_controller.is_available()
except BaseException: except BaseException:
# traceback.print_exc()
return False return False
# Main subscription loop # Main subscription loop
@@ -90,7 +88,7 @@ class DeviceListener:
segments = await self.api_helper.get_segments(state.videoId) segments = await self.api_helper.get_segments(state.videoId)
if state.state.value == 1: # Playing if state.state.value == 1: # Playing
self.logger.info( self.logger.info(
f"Playing video {state.videoId} with {len(segments)} segments" "Playing video %s with %d segments", state.videoId, len(segments)
) )
if segments: # If there are segments if segments: # If there are segments
await self.time_to_segment(segments, state.currentTime, time_start) await self.time_to_segment(segments, state.currentTime, time_start)

View File

@@ -79,7 +79,7 @@ class Element(Static):
self.tooltip = tooltip self.tooltip = tooltip
def process_values_from_data(self): def process_values_from_data(self):
pass raise NotImplementedError("Subclasses must implement this method.")
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Button( yield Button(
@@ -229,7 +229,8 @@ class ExitScreen(ModalWithClickExit):
class AddDevice(ModalWithClickExit): class AddDevice(ModalWithClickExit):
"""Screen with a dialog to add a device, either with a pairing code or with lan discovery.""" """Screen with a dialog to add a device, either with a pairing code
or with lan discovery."""
BINDINGS = [("escape", "dismiss({})", "Return")] BINDINGS = [("escape", "dismiss({})", "Return")]
@@ -387,7 +388,8 @@ class AddDevice(ModalWithClickExit):
class AddChannel(ModalWithClickExit): class AddChannel(ModalWithClickExit):
"""Screen with a dialog to add a channel, either using search or with a channel id.""" """Screen with a dialog to add a channel,
either using search or with a channel id."""
BINDINGS = [("escape", "dismiss(())", "Return")] BINDINGS = [("escape", "dismiss(())", "Return")]
@@ -553,7 +555,7 @@ class EditDevice(ModalWithClickExit):
def action_close_screen_saving(self) -> None: def action_close_screen_saving(self) -> None:
self.dismiss() self.dismiss()
def dismiss(self) -> None: def dismiss(self, _=None) -> None:
self.device_data["name"] = self.query_one("#device-name-input").value self.device_data["name"] = self.query_one("#device-name-input").value
self.device_data["screen_id"] = self.query_one("#device-id-input").value self.device_data["screen_id"] = self.query_one("#device-id-input").value
self.device_data["offset"] = int(self.query_one("#device-offset-input").value) self.device_data["offset"] = int(self.query_one("#device-offset-input").value)
@@ -793,7 +795,8 @@ class AdSkipMuteManager(Vertical):
class ChannelWhitelistManager(Vertical): class ChannelWhitelistManager(Vertical):
"""Manager for channel whitelist, allows adding/removing channels from the whitelist.""" """Manager for channel whitelist,
allows adding/removing channels from the whitelist."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
@@ -884,8 +887,6 @@ class AutoPlayManager(Vertical):
class ISponsorBlockTVSetupMainScreen(Screen): class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
TITLE = "iSponsorBlockTV" TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard" SUB_TITLE = "Setup Wizard"
BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")] BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
@@ -926,7 +927,6 @@ class ISponsorBlockTVSetupMainScreen(Screen):
def on_mount(self) -> None: def on_mount(self) -> None:
if self.check_for_old_config_entries(): if self.check_for_old_config_entries():
self.app.push_screen(MigrationScreen()) self.app.push_screen(MigrationScreen())
pass
def action_save(self) -> None: def action_save(self) -> None:
self.config.save() self.config.save()

View File

@@ -70,8 +70,9 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
pass pass
finally: finally:
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog()) self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
# A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we # A bunch of events useful to detect ads playing,
# can get the segments) # and the next video before it starts playing
# (that way we can get the segments)
if event_type == "onStateChange": if event_type == "onStateChange":
data = args[0] data = args[0]
# print(data) # print(data)
@@ -100,10 +101,10 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
): # Seen multiple other adStates, assuming they are all ads ): # Seen multiple other adStates, assuming they are all ads
self.logger.info("Ad has started, muting") self.logger.info("Ad has started, muting")
create_task(self.mute(True, override=True)) create_task(self.mute(True, override=True))
# Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it) # Manages volume, useful since YouTube wants to know the volume
# when unmuting (even if they already have it)
elif event_type == "onVolumeChanged": elif event_type == "onVolumeChanged":
self.volume_state = args[0] self.volume_state = args[0]
pass
# Gets segments for the next video before it starts playing # Gets segments for the next video before it starts playing
elif event_type == "autoplayUpNext": elif event_type == "autoplayUpNext":
if len(args) > 0 and ( if len(args) > 0 and (
@@ -168,16 +169,22 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
async def set_volume(self, volume: int) -> None: async def set_volume(self, volume: int) -> None:
await super()._command("setVolume", {"volume": volume}) await super()._command("setVolume", {"volume": volume})
# Mute or unmute the device (if the device already is in the desired state, nothing happens)
# mute: True to mute, False to unmute
# override: If True, the command is sent even if the device already is in the desired state
# TODO: Only works if the device is subscribed to the lounge
async def mute(self, mute: bool, override: bool = False) -> None: async def mute(self, mute: bool, override: bool = False) -> None:
"""
Mute or unmute the device (if the device already
is in the desired state, nothing happens)
:param bool mute: True to mute, False to unmute
:param bool override: If True, the command is sent even if the
device already is in the desired state
TODO: Only works if the device is subscribed to the lounge
"""
if mute: if mute:
mute_str = "true" mute_str = "true"
else: else:
mute_str = "false" mute_str = "false"
if override or not (self.volume_state.get("muted", "false") == mute_str): if override or not self.volume_state.get("muted", "false") == mute_str:
self.volume_state["muted"] = mute_str self.volume_state["muted"] = mute_str
# YouTube wants the volume when unmuting, so we send it # YouTube wants the volume when unmuting, so we send it
await super()._command( await super()._command(