Compare commits

..

5 Commits

Author SHA1 Message Date
David
63f5a3bc41 Bump version 2024-09-18 15:17:29 +02:00
David
e999a93503 Merge pull request #191 from dmunozv04/add-mutex-command
Implements mutex when sending commands to YouTube
2024-09-18 15:17:07 +02:00
pre-commit-ci[bot]
8cc3f8aa05 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-09-15 13:20:36 +00:00
dmunozv04
5fadc81a69 Fix occasional IndexError
in loungeScreenDisconnected event
2024-09-15 14:49:39 +02:00
dmunozv04
39aef5babf Test wrap command function in a mutex
to avoid race conditions with the _command_offset
2024-09-14 23:44:32 +02:00
7 changed files with 71 additions and 190 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "iSponsorBlockTV" name = "iSponsorBlockTV"
version = "2.1.0" version = "2.1.1"
authors = [ authors = [
{"name" = "dmunozv04"} {"name" = "dmunozv04"}
] ]

View File

@@ -9,7 +9,6 @@ from appdirs import user_data_dir
from . import config_setup, main, setup_wizard from . import config_setup, main, setup_wizard
from .constants import config_file_blacklist_keys from .constants import config_file_blacklist_keys
from .service.service_helpers import service
class Device: class Device:
@@ -214,8 +213,6 @@ pyapp_group.add_command(
if os.getenv("PYAPP"): if os.getenv("PYAPP"):
cli.add_command(pyapp_group) cli.add_command(pyapp_group)
cli.add_command(service)
def app_start(): def app_start():
cli(obj={}) cli(obj={})

View File

@@ -0,0 +1,57 @@
import os
import plistlib
from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons"""
default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV",
"RunAtLoad": True,
"StartInterval": 20,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardErrorPath": "", # Fill later
"StandardOutPath": "",
"ProgramArguments": "",
"WorkingDirectory": "",
}
def create_plist(path):
plist = default_plist
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"]
plist["StandardErrorPath"] = path + "/iSponsorBlockTV.error.log"
plist["StandardOutPath"] = path + "/iSponsorBlockTV.out.log"
plist["WorkingDirectory"] = path
launchd_path = os.path.expanduser("~/Library/LaunchAgents/")
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist"
with open(path_to_save, "wb") as fp:
plistlib.dump(plist, fp)
def run_setup(file):
config = {}
config_setup.main(config, file, debug=False)
def main():
correct_path = os.path.expanduser("~/iSponsorBlockTV")
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"):
print("Program is on the right path")
print("The launch daemon will now be installed")
create_plist(correct_path)
run_setup(correct_path + "/config.json")
print(
"Launch daemon installed. Please restart the computer to enable it or"
" use:\n launchctl load"
" ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
)
else:
if not os.path.exists(correct_path):
os.makedirs(correct_path)
print(
"Please move the program to the correct path: "
+ correct_path
+ "opening now on finder..."
)
os.system("open -R " + correct_path)

View File

@@ -1,76 +0,0 @@
import os
import sys
import rich_click as click
from .service_managers import select_service_manager
@click.group()
@click.pass_context
def service(ctx):
"""Manage the program as a service (executable only)"""
ctx.ensure_object(dict)
if os.getenv("PYAPP") is None:
print(
"Service commands are only available in the executable version of the"
" program"
)
sys.exit(1)
ctx.obj["service_manager"] = select_service_manager()(os.getenv("PYAPP"))
@service.command()
@click.pass_context
def start(ctx):
"""Start the service"""
ctx.obj["service_manager"].start()
@service.command()
@click.pass_context
def stop(ctx):
"""Stop the service"""
ctx.obj["service_manager"].stop()
@service.command()
@click.pass_context
def restart(ctx):
"""Restart the service"""
ctx.obj["service_manager"].restart()
@service.command()
@click.pass_context
def status(ctx):
"""Get the status of the service"""
ctx.obj["service_manager"].status()
@service.command()
@click.pass_context
def install(ctx):
"""Install the service"""
ctx.obj["service_manager"].install()
@service.command()
@click.pass_context
def uninstall(ctx):
"""Uninstall the service"""
ctx.obj["service_manager"].uninstall()
@service.command()
@click.pass_context
def enable(ctx):
"""Enable the service"""
ctx.obj["service_manager"].enable()
@service.command()
@click.pass_context
def disable(ctx):
"""Disable the service"""
ctx.obj["service_manager"].disable()

View File

@@ -1,107 +0,0 @@
import os
import plistlib
import subprocess
from platform import system
from appdirs import user_log_dir
def select_service_manager() -> "ServiceManager":
platform = system()
if platform == "Darwin":
return Launchd
elif platform == "Linux":
return Systemd
else:
raise NotImplementedError("Unsupported platform")
class ServiceManager:
def __init__(self, executable_path, *args, **kwargs):
self.executable_path = executable_path
def start(self):
pass
def stop(self):
pass
def restart(self):
pass
def status(self):
pass
def install(self):
pass
def uninstall(self):
pass
def enable(self):
pass
def disable(self):
pass
class Launchd(ServiceManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.service_name = "com.dmunozv04.iSponsorBlockTV"
self.service_path = (
os.path.expanduser("~/Library/LaunchAgents/") + self.service_name + ".plist"
)
def start(self):
subprocess.run(["launchctl", "start", self.service_name])
def stop(self):
subprocess.run(["launchctl", "stop", self.service_name])
def restart(self):
subprocess.run(["launchctl", "restart", self.service_name])
def status(self):
subprocess.run(["launchctl", "list", self.service_name])
def install(self):
if os.path.exists(self.service_path):
print("Service already installed")
return
logs_dir = user_log_dir("iSponsorBlockTV", "dmunozv04")
# ensure the logs directory exists
os.makedirs(logs_dir, exist_ok=True)
plist = {
"Label": "com.dmunozv04.iSponsorBlockTV",
"RunAtLoad": True,
"StartInterval": 20,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardErrorPath": logs_dir + "/iSponsorBlockTV.err",
"StandardOutPath": logs_dir + "/iSponsorBlockTV.out",
"Program": self.executable_path,
}
with open(self.service_path, "wb") as fp:
plistlib.dump(plist, fp)
print("Service installed")
self.enable()
def uninstall(self):
self.disable()
# Remove the file
try:
os.remove(self.service_path)
print("Service uninstalled")
except FileNotFoundError:
print("Service not found")
def enable(self):
subprocess.run(["launchctl", "load", self.service_path])
def disable(self):
subprocess.run(["launchctl", "stop", self.service_name])
subprocess.run(["launchctl", "unload", self.service_path])
class Systemd(ServiceManager):
pass

View File

@@ -35,6 +35,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.mute_ads = config.mute_ads self.mute_ads = config.mute_ads
self.skip_ads = config.skip_ads self.skip_ads = config.skip_ads
self.auto_play = config.auto_play self.auto_play = config.auto_play
self._command_mutex = asyncio.Lock()
# Ensures that we still are subscribed to the lounge # Ensures that we still are subscribed to the lounge
async def _watchdog(self): async def _watchdog(self):
@@ -145,9 +146,12 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.shorts_disconnected = False self.shorts_disconnected = False
create_task(self.play_video(video_id_saved)) create_task(self.play_video(video_id_saved))
elif event_type == "loungeScreenDisconnected": elif event_type == "loungeScreenDisconnected":
data = args[0] if args: # Sometimes it's empty
if data["reason"] == "disconnectedByUserScreenInitiated": # Short playing? data = args[0]
self.shorts_disconnected = True if (
data["reason"] == "disconnectedByUserScreenInitiated"
): # Short playing?
self.shorts_disconnected = True
elif event_type == "onAutoplayModeChanged": elif event_type == "onAutoplayModeChanged":
create_task(self.set_auto_play_mode(self.auto_play)) create_task(self.set_auto_play_mode(self.auto_play))
@@ -181,3 +185,9 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
async def play_video(self, video_id: str) -> bool: async def play_video(self, video_id: str) -> bool:
return await self._command("setPlaylist", {"videoId": video_id}) return await self._command("setPlaylist", {"videoId": video_id})
# Test to wrap the command function in a mutex to avoid race conditions with
# the _command_offset (TODO: move to upstream if it works)
async def _command(self, command: str, command_parameters: dict = None) -> bool:
async with self._command_mutex:
return await super()._command(command, command_parameters)