Compare commits

..

2 Commits

Author SHA1 Message Date
pre-commit-ci[bot]
7d3b1c6469 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-08-18 15:38:13 +00:00
dmunozv04
b7a295b3e2 create service handlers for macOS 2024-08-18 17:36:29 +02:00
13 changed files with 390 additions and 309 deletions

View File

@@ -179,8 +179,6 @@ jobs:
publish-to-release:
permissions:
contents: write
needs:
- build-sdist-and-wheel
- build-binaries

View File

@@ -18,6 +18,5 @@
{"id": "",
"name": ""
}
],
"api_server": "https://sponsor.ajay.app"
]
}

View File

@@ -1,6 +1,6 @@
[project]
name = "iSponsorBlockTV"
version = "2.2.1"
version = "2.1.0"
authors = [
{"name" = "dmunozv04"}
]

View File

@@ -27,7 +27,6 @@ class ApiHelper:
self.skip_count_tracking = config.skip_count_tracking
self.web_session = web_session
self.num_devices = len(config.devices)
self.api_server = config.api_server
# Not used anymore, maybe it can stay here a little longer
@AsyncLRU(maxsize=10)
@@ -131,7 +130,7 @@ class ApiHelper:
"service": constants.SponsorBlock_service,
}
headers = {"Accept": "application/json"}
url = self.api_server + "/api/skipSegments/" + vid_id_hashed
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with self.web_session.get(
url, headers=headers, params=params
) as response:
@@ -202,7 +201,7 @@ class ApiHelper:
Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking:
for i in uuids:
url = self.api_server + "/api/viewedVideoSponsorTime/"
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": i}
await self.web_session.post(url, params=params)

View File

@@ -36,9 +36,6 @@ REPORT_SKIPPED_SEGMENTS_PROMPT = (
MUTE_ADS_PROMPT = "Do you want to mute native YouTube ads automatically? (y/N) "
SKIP_ADS_PROMPT = "Do you want to skip native YouTube ads automatically? (y/N) "
AUTOPLAY_PROMPT = "Do you want to enable autoplay? (Y/n) "
ENTER_API_SERVER_PROMPT = (
"Enter the custom API server URL (leave blank to use default): "
)
def get_yn_input(prompt):
@@ -193,10 +190,6 @@ def main(config, debug: bool) -> None:
choice = get_yn_input(AUTOPLAY_PROMPT)
config.auto_play = choice != "n"
api_server = input(ENTER_API_SERVER_PROMPT)
if api_server:
config.api_server = api_server
print("Config finished")
config.save()
loop.run_until_complete(web_session.close())

View File

@@ -2,6 +2,7 @@ userAgent = "iSponsorBlockTV/0.1"
SponsorBlock_service = "youtube"
SponsorBlock_actiontype = "skip"
SponsorBlock_api = "https://sponsor.ajay.app/api/"
Youtube_api = "https://www.googleapis.com/youtube/v3/"
skip_categories = (
@@ -19,4 +20,5 @@ skip_categories = (
youtube_client_blacklist = ["TVHTML5_FOR_KIDS"]
config_file_blacklist_keys = ["config_file", "data_dir"]

View File

@@ -9,6 +9,7 @@ from appdirs import user_data_dir
from . import config_setup, main, setup_wizard
from .constants import config_file_blacklist_keys
from .service.service_helpers import service
class Device:
@@ -42,7 +43,6 @@ class Config:
self.mute_ads = False
self.skip_ads = False
self.auto_play = True
self.api_server = "https://sponsor.ajay.app"
self.__load()
def validate(self):
@@ -214,6 +214,8 @@ pyapp_group.add_command(
if os.getenv("PYAPP"):
cli.add_command(pyapp_group)
cli.add_command(service)
def app_start():
cli(obj={})

View File

@@ -1,57 +0,0 @@
import os
import plistlib
from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons"""
default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV",
"RunAtLoad": True,
"StartInterval": 20,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardErrorPath": "", # Fill later
"StandardOutPath": "",
"ProgramArguments": "",
"WorkingDirectory": "",
}
def create_plist(path):
plist = default_plist
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"]
plist["StandardErrorPath"] = path + "/iSponsorBlockTV.error.log"
plist["StandardOutPath"] = path + "/iSponsorBlockTV.out.log"
plist["WorkingDirectory"] = path
launchd_path = os.path.expanduser("~/Library/LaunchAgents/")
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist"
with open(path_to_save, "wb") as fp:
plistlib.dump(plist, fp)
def run_setup(file):
config = {}
config_setup.main(config, file, debug=False)
def main():
correct_path = os.path.expanduser("~/iSponsorBlockTV")
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"):
print("Program is on the right path")
print("The launch daemon will now be installed")
create_plist(correct_path)
run_setup(correct_path + "/config.json")
print(
"Launch daemon installed. Please restart the computer to enable it or"
" use:\n launchctl load"
" ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
)
else:
if not os.path.exists(correct_path):
os.makedirs(correct_path)
print(
"Please move the program to the correct path: "
+ correct_path
+ "opening now on finder..."
)
os.system("open -R " + correct_path)

View File

View File

@@ -0,0 +1,76 @@
import os
import sys
import rich_click as click
from .service_managers import select_service_manager
@click.group()
@click.pass_context
def service(ctx):
"""Manage the program as a service (executable only)"""
ctx.ensure_object(dict)
if os.getenv("PYAPP") is None:
print(
"Service commands are only available in the executable version of the"
" program"
)
sys.exit(1)
ctx.obj["service_manager"] = select_service_manager()(os.getenv("PYAPP"))
@service.command()
@click.pass_context
def start(ctx):
"""Start the service"""
ctx.obj["service_manager"].start()
@service.command()
@click.pass_context
def stop(ctx):
"""Stop the service"""
ctx.obj["service_manager"].stop()
@service.command()
@click.pass_context
def restart(ctx):
"""Restart the service"""
ctx.obj["service_manager"].restart()
@service.command()
@click.pass_context
def status(ctx):
"""Get the status of the service"""
ctx.obj["service_manager"].status()
@service.command()
@click.pass_context
def install(ctx):
"""Install the service"""
ctx.obj["service_manager"].install()
@service.command()
@click.pass_context
def uninstall(ctx):
"""Uninstall the service"""
ctx.obj["service_manager"].uninstall()
@service.command()
@click.pass_context
def enable(ctx):
"""Enable the service"""
ctx.obj["service_manager"].enable()
@service.command()
@click.pass_context
def disable(ctx):
"""Disable the service"""
ctx.obj["service_manager"].disable()

View File

@@ -0,0 +1,107 @@
import os
import plistlib
import subprocess
from platform import system
from appdirs import user_log_dir
def select_service_manager() -> "ServiceManager":
platform = system()
if platform == "Darwin":
return Launchd
elif platform == "Linux":
return Systemd
else:
raise NotImplementedError("Unsupported platform")
class ServiceManager:
def __init__(self, executable_path, *args, **kwargs):
self.executable_path = executable_path
def start(self):
pass
def stop(self):
pass
def restart(self):
pass
def status(self):
pass
def install(self):
pass
def uninstall(self):
pass
def enable(self):
pass
def disable(self):
pass
class Launchd(ServiceManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.service_name = "com.dmunozv04.iSponsorBlockTV"
self.service_path = (
os.path.expanduser("~/Library/LaunchAgents/") + self.service_name + ".plist"
)
def start(self):
subprocess.run(["launchctl", "start", self.service_name])
def stop(self):
subprocess.run(["launchctl", "stop", self.service_name])
def restart(self):
subprocess.run(["launchctl", "restart", self.service_name])
def status(self):
subprocess.run(["launchctl", "list", self.service_name])
def install(self):
if os.path.exists(self.service_path):
print("Service already installed")
return
logs_dir = user_log_dir("iSponsorBlockTV", "dmunozv04")
# ensure the logs directory exists
os.makedirs(logs_dir, exist_ok=True)
plist = {
"Label": "com.dmunozv04.iSponsorBlockTV",
"RunAtLoad": True,
"StartInterval": 20,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardErrorPath": logs_dir + "/iSponsorBlockTV.err",
"StandardOutPath": logs_dir + "/iSponsorBlockTV.out",
"Program": self.executable_path,
}
with open(self.service_path, "wb") as fp:
plistlib.dump(plist, fp)
print("Service installed")
self.enable()
def uninstall(self):
self.disable()
# Remove the file
try:
os.remove(self.service_path)
print("Service uninstalled")
except FileNotFoundError:
print("Service not found")
def enable(self):
subprocess.run(["launchctl", "load", self.service_path])
def disable(self):
subprocess.run(["launchctl", "stop", self.service_name])
subprocess.run(["launchctl", "unload", self.service_path])
class Systemd(ServiceManager):
pass

View File

@@ -876,31 +876,6 @@ class AutoPlayManager(Vertical):
self.config.auto_play = event.checkbox.value
class ApiServerManager(Vertical):
"""Manager for the custom API server URL."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Custom API Server", classes="title")
yield Label(
"You can specify a custom SponsorBlock API server URL here.",
classes="subtitle",
)
with Grid(id="api-server-grid"):
yield Input(
placeholder="Custom API Server URL",
id="api-server-input",
value=self.config.api_server,
)
@on(Input.Changed, "#api-server-input")
def changed_api_server(self, event: Input.Changed):
self.config.api_server = event.input.value
class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
@@ -940,9 +915,6 @@ class ISponsorBlockTVSetupMainScreen(Screen):
yield AutoPlayManager(
config=self.config, id="autoplay-manager", classes="container"
)
yield ApiServerManager(
config=self.config, id="api-server-manager", classes="container"
)
def on_mount(self) -> None:
if self.check_for_old_config_entries():

View File

@@ -35,7 +35,6 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.mute_ads = config.mute_ads
self.skip_ads = config.skip_ads
self.auto_play = config.auto_play
self._command_mutex = asyncio.Lock()
# Ensures that we still are subscribed to the lounge
async def _watchdog(self):
@@ -146,11 +145,8 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.shorts_disconnected = False
create_task(self.play_video(video_id_saved))
elif event_type == "loungeScreenDisconnected":
if args: # Sometimes it's empty
data = args[0]
if (
data["reason"] == "disconnectedByUserScreenInitiated"
): # Short playing?
if data["reason"] == "disconnectedByUserScreenInitiated": # Short playing?
self.shorts_disconnected = True
elif event_type == "onAutoplayModeChanged":
create_task(self.set_auto_play_mode(self.auto_play))
@@ -185,9 +181,3 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
async def play_video(self, video_id: str) -> bool:
return await self._command("setPlaylist", {"videoId": video_id})
# Test to wrap the command function in a mutex to avoid race conditions with
# the _command_offset (TODO: move to upstream if it works)
async def _command(self, command: str, command_parameters: dict = None) -> bool:
async with self._command_mutex:
return await super()._command(command, command_parameters)