mirror of
https://github.com/dmunozv04/iSponsorBlockTV.git
synced 2025-12-06 20:06:44 +03:00
Compare commits
2 Commits
add-custom
...
service
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d3b1c6469 | ||
|
|
b7a295b3e2 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -179,8 +179,6 @@ jobs:
|
||||
|
||||
|
||||
publish-to-release:
|
||||
permissions:
|
||||
contents: write
|
||||
needs:
|
||||
- build-sdist-and-wheel
|
||||
- build-binaries
|
||||
|
||||
@@ -18,6 +18,5 @@
|
||||
{"id": "",
|
||||
"name": ""
|
||||
}
|
||||
],
|
||||
"api_server": "https://sponsor.ajay.app"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "iSponsorBlockTV"
|
||||
version = "2.2.1"
|
||||
version = "2.1.0"
|
||||
authors = [
|
||||
{"name" = "dmunozv04"}
|
||||
]
|
||||
|
||||
@@ -27,7 +27,6 @@ class ApiHelper:
|
||||
self.skip_count_tracking = config.skip_count_tracking
|
||||
self.web_session = web_session
|
||||
self.num_devices = len(config.devices)
|
||||
self.api_server = config.api_server
|
||||
|
||||
# Not used anymore, maybe it can stay here a little longer
|
||||
@AsyncLRU(maxsize=10)
|
||||
@@ -131,7 +130,7 @@ class ApiHelper:
|
||||
"service": constants.SponsorBlock_service,
|
||||
}
|
||||
headers = {"Accept": "application/json"}
|
||||
url = self.api_server + "/api/skipSegments/" + vid_id_hashed
|
||||
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
|
||||
async with self.web_session.get(
|
||||
url, headers=headers, params=params
|
||||
) as response:
|
||||
@@ -202,7 +201,7 @@ class ApiHelper:
|
||||
Lets the contributor know that someone skipped the segment (thanks)"""
|
||||
if self.skip_count_tracking:
|
||||
for i in uuids:
|
||||
url = self.api_server + "/api/viewedVideoSponsorTime/"
|
||||
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
|
||||
params = {"UUID": i}
|
||||
await self.web_session.post(url, params=params)
|
||||
|
||||
|
||||
@@ -1,202 +1,195 @@
|
||||
import asyncio
|
||||
|
||||
import aiohttp
|
||||
|
||||
from . import api_helpers, ytlounge
|
||||
|
||||
# Constants for user input prompts
|
||||
ATVS_REMOVAL_PROMPT = (
|
||||
"Do you want to remove the legacy 'atvs' entry (the app won't start"
|
||||
" with it present)? (y/N) "
|
||||
)
|
||||
PAIRING_CODE_PROMPT = "Enter pairing code (found in Settings - Link with TV code): "
|
||||
ADD_MORE_DEVICES_PROMPT = "Paired with {num_devices} Device(s). Add more? (y/N) "
|
||||
CHANGE_API_KEY_PROMPT = "API key already specified. Change it? (y/N) "
|
||||
ADD_API_KEY_PROMPT = (
|
||||
"API key only needed for the channel whitelist function. Add it? (y/N) "
|
||||
)
|
||||
ENTER_API_KEY_PROMPT = "Enter your API key: "
|
||||
CHANGE_SKIP_CATEGORIES_PROMPT = "Skip categories already specified. Change them? (y/N) "
|
||||
ENTER_SKIP_CATEGORIES_PROMPT = (
|
||||
"Enter skip categories (space or comma sepparated) Options: [sponsor,"
|
||||
" selfpromo, exclusive_access, interaction, poi_highlight, intro, outro,"
|
||||
" preview, filler, music_offtopic]:\n"
|
||||
)
|
||||
WHITELIST_CHANNELS_PROMPT = (
|
||||
"Do you want to whitelist any channels from being ad-blocked? (y/N) "
|
||||
)
|
||||
SEARCH_CHANNEL_PROMPT = 'Enter a channel name or "/exit" to exit: '
|
||||
SELECT_CHANNEL_PROMPT = "Select one option of the above [0-6]: "
|
||||
ENTER_CHANNEL_ID_PROMPT = "Enter a channel ID: "
|
||||
ENTER_CUSTOM_CHANNEL_NAME_PROMPT = "Enter the channel name: "
|
||||
REPORT_SKIPPED_SEGMENTS_PROMPT = (
|
||||
"Do you want to report skipped segments to sponsorblock. Only the segment"
|
||||
" UUID will be sent? (Y/n) "
|
||||
)
|
||||
MUTE_ADS_PROMPT = "Do you want to mute native YouTube ads automatically? (y/N) "
|
||||
SKIP_ADS_PROMPT = "Do you want to skip native YouTube ads automatically? (y/N) "
|
||||
AUTOPLAY_PROMPT = "Do you want to enable autoplay? (Y/n) "
|
||||
ENTER_API_SERVER_PROMPT = (
|
||||
"Enter the custom API server URL (leave blank to use default): "
|
||||
)
|
||||
|
||||
|
||||
def get_yn_input(prompt):
|
||||
while choice := input(prompt):
|
||||
if choice.lower() in ["y", "n"]:
|
||||
return choice.lower()
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
|
||||
async def pair_device():
|
||||
try:
|
||||
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
|
||||
pairing_code = input(PAIRING_CODE_PROMPT)
|
||||
pairing_code = int(
|
||||
pairing_code.replace("-", "").replace(" ", "")
|
||||
) # remove dashes and spaces
|
||||
print("Pairing...")
|
||||
paired = await lounge_controller.pair(pairing_code)
|
||||
if not paired:
|
||||
print("Failed to pair device")
|
||||
return
|
||||
device = {
|
||||
"screen_id": lounge_controller.auth.screen_id,
|
||||
"name": lounge_controller.screen_name,
|
||||
}
|
||||
print(f"Paired device: {device['name']}")
|
||||
return device
|
||||
except Exception as e:
|
||||
print(f"Failed to pair device: {e}")
|
||||
return
|
||||
|
||||
|
||||
def main(config, debug: bool) -> None:
|
||||
print("Welcome to the iSponsorBlockTV cli setup wizard")
|
||||
loop = asyncio.get_event_loop_policy().get_event_loop()
|
||||
web_session = aiohttp.ClientSession()
|
||||
if debug:
|
||||
loop.set_debug(True)
|
||||
asyncio.set_event_loop(loop)
|
||||
if hasattr(config, "atvs"):
|
||||
print(
|
||||
"The atvs config option is deprecated and has stopped working. Please read"
|
||||
" this for more information on how to upgrade to V2:"
|
||||
" \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
|
||||
)
|
||||
choice = get_yn_input(ATVS_REMOVAL_PROMPT)
|
||||
if choice == "y":
|
||||
del config["atvs"]
|
||||
|
||||
devices = config.devices
|
||||
choice = get_yn_input(ADD_MORE_DEVICES_PROMPT.format(num_devices=len(devices)))
|
||||
while choice == "y":
|
||||
task = loop.create_task(pair_device())
|
||||
loop.run_until_complete(task)
|
||||
device = task.result()
|
||||
if device:
|
||||
devices.append(device)
|
||||
choice = get_yn_input(ADD_MORE_DEVICES_PROMPT.format(num_devices=len(devices)))
|
||||
config.devices = devices
|
||||
|
||||
apikey = config.apikey
|
||||
if apikey:
|
||||
choice = get_yn_input(CHANGE_API_KEY_PROMPT)
|
||||
if choice == "y":
|
||||
apikey = input(ENTER_API_KEY_PROMPT)
|
||||
else:
|
||||
choice = get_yn_input(ADD_API_KEY_PROMPT)
|
||||
if choice == "y":
|
||||
print(
|
||||
"Get youtube apikey here:"
|
||||
" https://developers.google.com/youtube/registering_an_application"
|
||||
)
|
||||
apikey = input(ENTER_API_KEY_PROMPT)
|
||||
config.apikey = apikey
|
||||
|
||||
skip_categories = config.skip_categories
|
||||
if skip_categories:
|
||||
choice = get_yn_input(CHANGE_SKIP_CATEGORIES_PROMPT)
|
||||
if choice == "y":
|
||||
categories = input(ENTER_SKIP_CATEGORIES_PROMPT)
|
||||
skip_categories = categories.replace(",", " ").split(" ")
|
||||
skip_categories = [
|
||||
x for x in skip_categories if x != ""
|
||||
] # Remove empty strings
|
||||
else:
|
||||
categories = input(ENTER_SKIP_CATEGORIES_PROMPT)
|
||||
skip_categories = categories.replace(",", " ").split(" ")
|
||||
skip_categories = [
|
||||
x for x in skip_categories if x != ""
|
||||
] # Remove empty strings
|
||||
config.skip_categories = skip_categories
|
||||
|
||||
channel_whitelist = config.channel_whitelist
|
||||
choice = get_yn_input(WHITELIST_CHANNELS_PROMPT)
|
||||
if choice == "y":
|
||||
if not apikey:
|
||||
print(
|
||||
"WARNING: You need to specify an API key to use this function,"
|
||||
" otherwise the program will fail to start.\nYou can add one by"
|
||||
" re-running this setup wizard."
|
||||
)
|
||||
api_helper = api_helpers.ApiHelper(config, web_session)
|
||||
while True:
|
||||
channel_info = {}
|
||||
channel = input(SEARCH_CHANNEL_PROMPT)
|
||||
if channel == "/exit":
|
||||
break
|
||||
|
||||
task = loop.create_task(
|
||||
api_helper.search_channels(channel, apikey, web_session)
|
||||
)
|
||||
loop.run_until_complete(task)
|
||||
results = task.result()
|
||||
if len(results) == 0:
|
||||
print("No channels found")
|
||||
continue
|
||||
|
||||
for i, item in enumerate(results):
|
||||
print(f"{i}: {item[1]} - Subs: {item[2]}")
|
||||
print("5: Enter a custom channel ID")
|
||||
print("6: Go back")
|
||||
|
||||
while choice := input(SELECT_CHANNEL_PROMPT):
|
||||
if choice in [str(x) for x in range(7)]:
|
||||
break
|
||||
print("Invalid choice")
|
||||
|
||||
if choice == "5":
|
||||
channel_info["id"] = input(ENTER_CHANNEL_ID_PROMPT)
|
||||
channel_info["name"] = input(ENTER_CUSTOM_CHANNEL_NAME_PROMPT)
|
||||
channel_whitelist.append(channel_info)
|
||||
continue
|
||||
if choice == "6":
|
||||
continue
|
||||
|
||||
channel_info["id"] = results[int(choice)][0]
|
||||
channel_info["name"] = results[int(choice)][1]
|
||||
channel_whitelist.append(channel_info)
|
||||
# Close web session asynchronously
|
||||
|
||||
config.channel_whitelist = channel_whitelist
|
||||
|
||||
choice = get_yn_input(REPORT_SKIPPED_SEGMENTS_PROMPT)
|
||||
config.skip_count_tracking = choice != "n"
|
||||
|
||||
choice = get_yn_input(MUTE_ADS_PROMPT)
|
||||
config.mute_ads = choice == "y"
|
||||
|
||||
choice = get_yn_input(SKIP_ADS_PROMPT)
|
||||
config.skip_ads = choice == "y"
|
||||
|
||||
choice = get_yn_input(AUTOPLAY_PROMPT)
|
||||
config.auto_play = choice != "n"
|
||||
|
||||
api_server = input(ENTER_API_SERVER_PROMPT)
|
||||
if api_server:
|
||||
config.api_server = api_server
|
||||
|
||||
print("Config finished")
|
||||
config.save()
|
||||
loop.run_until_complete(web_session.close())
|
||||
import asyncio
|
||||
|
||||
import aiohttp
|
||||
|
||||
from . import api_helpers, ytlounge
|
||||
|
||||
# Constants for user input prompts
|
||||
ATVS_REMOVAL_PROMPT = (
|
||||
"Do you want to remove the legacy 'atvs' entry (the app won't start"
|
||||
" with it present)? (y/N) "
|
||||
)
|
||||
PAIRING_CODE_PROMPT = "Enter pairing code (found in Settings - Link with TV code): "
|
||||
ADD_MORE_DEVICES_PROMPT = "Paired with {num_devices} Device(s). Add more? (y/N) "
|
||||
CHANGE_API_KEY_PROMPT = "API key already specified. Change it? (y/N) "
|
||||
ADD_API_KEY_PROMPT = (
|
||||
"API key only needed for the channel whitelist function. Add it? (y/N) "
|
||||
)
|
||||
ENTER_API_KEY_PROMPT = "Enter your API key: "
|
||||
CHANGE_SKIP_CATEGORIES_PROMPT = "Skip categories already specified. Change them? (y/N) "
|
||||
ENTER_SKIP_CATEGORIES_PROMPT = (
|
||||
"Enter skip categories (space or comma sepparated) Options: [sponsor,"
|
||||
" selfpromo, exclusive_access, interaction, poi_highlight, intro, outro,"
|
||||
" preview, filler, music_offtopic]:\n"
|
||||
)
|
||||
WHITELIST_CHANNELS_PROMPT = (
|
||||
"Do you want to whitelist any channels from being ad-blocked? (y/N) "
|
||||
)
|
||||
SEARCH_CHANNEL_PROMPT = 'Enter a channel name or "/exit" to exit: '
|
||||
SELECT_CHANNEL_PROMPT = "Select one option of the above [0-6]: "
|
||||
ENTER_CHANNEL_ID_PROMPT = "Enter a channel ID: "
|
||||
ENTER_CUSTOM_CHANNEL_NAME_PROMPT = "Enter the channel name: "
|
||||
REPORT_SKIPPED_SEGMENTS_PROMPT = (
|
||||
"Do you want to report skipped segments to sponsorblock. Only the segment"
|
||||
" UUID will be sent? (Y/n) "
|
||||
)
|
||||
MUTE_ADS_PROMPT = "Do you want to mute native YouTube ads automatically? (y/N) "
|
||||
SKIP_ADS_PROMPT = "Do you want to skip native YouTube ads automatically? (y/N) "
|
||||
AUTOPLAY_PROMPT = "Do you want to enable autoplay? (Y/n) "
|
||||
|
||||
|
||||
def get_yn_input(prompt):
|
||||
while choice := input(prompt):
|
||||
if choice.lower() in ["y", "n"]:
|
||||
return choice.lower()
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
|
||||
async def pair_device():
|
||||
try:
|
||||
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
|
||||
pairing_code = input(PAIRING_CODE_PROMPT)
|
||||
pairing_code = int(
|
||||
pairing_code.replace("-", "").replace(" ", "")
|
||||
) # remove dashes and spaces
|
||||
print("Pairing...")
|
||||
paired = await lounge_controller.pair(pairing_code)
|
||||
if not paired:
|
||||
print("Failed to pair device")
|
||||
return
|
||||
device = {
|
||||
"screen_id": lounge_controller.auth.screen_id,
|
||||
"name": lounge_controller.screen_name,
|
||||
}
|
||||
print(f"Paired device: {device['name']}")
|
||||
return device
|
||||
except Exception as e:
|
||||
print(f"Failed to pair device: {e}")
|
||||
return
|
||||
|
||||
|
||||
def main(config, debug: bool) -> None:
|
||||
print("Welcome to the iSponsorBlockTV cli setup wizard")
|
||||
loop = asyncio.get_event_loop_policy().get_event_loop()
|
||||
web_session = aiohttp.ClientSession()
|
||||
if debug:
|
||||
loop.set_debug(True)
|
||||
asyncio.set_event_loop(loop)
|
||||
if hasattr(config, "atvs"):
|
||||
print(
|
||||
"The atvs config option is deprecated and has stopped working. Please read"
|
||||
" this for more information on how to upgrade to V2:"
|
||||
" \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
|
||||
)
|
||||
choice = get_yn_input(ATVS_REMOVAL_PROMPT)
|
||||
if choice == "y":
|
||||
del config["atvs"]
|
||||
|
||||
devices = config.devices
|
||||
choice = get_yn_input(ADD_MORE_DEVICES_PROMPT.format(num_devices=len(devices)))
|
||||
while choice == "y":
|
||||
task = loop.create_task(pair_device())
|
||||
loop.run_until_complete(task)
|
||||
device = task.result()
|
||||
if device:
|
||||
devices.append(device)
|
||||
choice = get_yn_input(ADD_MORE_DEVICES_PROMPT.format(num_devices=len(devices)))
|
||||
config.devices = devices
|
||||
|
||||
apikey = config.apikey
|
||||
if apikey:
|
||||
choice = get_yn_input(CHANGE_API_KEY_PROMPT)
|
||||
if choice == "y":
|
||||
apikey = input(ENTER_API_KEY_PROMPT)
|
||||
else:
|
||||
choice = get_yn_input(ADD_API_KEY_PROMPT)
|
||||
if choice == "y":
|
||||
print(
|
||||
"Get youtube apikey here:"
|
||||
" https://developers.google.com/youtube/registering_an_application"
|
||||
)
|
||||
apikey = input(ENTER_API_KEY_PROMPT)
|
||||
config.apikey = apikey
|
||||
|
||||
skip_categories = config.skip_categories
|
||||
if skip_categories:
|
||||
choice = get_yn_input(CHANGE_SKIP_CATEGORIES_PROMPT)
|
||||
if choice == "y":
|
||||
categories = input(ENTER_SKIP_CATEGORIES_PROMPT)
|
||||
skip_categories = categories.replace(",", " ").split(" ")
|
||||
skip_categories = [
|
||||
x for x in skip_categories if x != ""
|
||||
] # Remove empty strings
|
||||
else:
|
||||
categories = input(ENTER_SKIP_CATEGORIES_PROMPT)
|
||||
skip_categories = categories.replace(",", " ").split(" ")
|
||||
skip_categories = [
|
||||
x for x in skip_categories if x != ""
|
||||
] # Remove empty strings
|
||||
config.skip_categories = skip_categories
|
||||
|
||||
channel_whitelist = config.channel_whitelist
|
||||
choice = get_yn_input(WHITELIST_CHANNELS_PROMPT)
|
||||
if choice == "y":
|
||||
if not apikey:
|
||||
print(
|
||||
"WARNING: You need to specify an API key to use this function,"
|
||||
" otherwise the program will fail to start.\nYou can add one by"
|
||||
" re-running this setup wizard."
|
||||
)
|
||||
api_helper = api_helpers.ApiHelper(config, web_session)
|
||||
while True:
|
||||
channel_info = {}
|
||||
channel = input(SEARCH_CHANNEL_PROMPT)
|
||||
if channel == "/exit":
|
||||
break
|
||||
|
||||
task = loop.create_task(
|
||||
api_helper.search_channels(channel, apikey, web_session)
|
||||
)
|
||||
loop.run_until_complete(task)
|
||||
results = task.result()
|
||||
if len(results) == 0:
|
||||
print("No channels found")
|
||||
continue
|
||||
|
||||
for i, item in enumerate(results):
|
||||
print(f"{i}: {item[1]} - Subs: {item[2]}")
|
||||
print("5: Enter a custom channel ID")
|
||||
print("6: Go back")
|
||||
|
||||
while choice := input(SELECT_CHANNEL_PROMPT):
|
||||
if choice in [str(x) for x in range(7)]:
|
||||
break
|
||||
print("Invalid choice")
|
||||
|
||||
if choice == "5":
|
||||
channel_info["id"] = input(ENTER_CHANNEL_ID_PROMPT)
|
||||
channel_info["name"] = input(ENTER_CUSTOM_CHANNEL_NAME_PROMPT)
|
||||
channel_whitelist.append(channel_info)
|
||||
continue
|
||||
if choice == "6":
|
||||
continue
|
||||
|
||||
channel_info["id"] = results[int(choice)][0]
|
||||
channel_info["name"] = results[int(choice)][1]
|
||||
channel_whitelist.append(channel_info)
|
||||
# Close web session asynchronously
|
||||
|
||||
config.channel_whitelist = channel_whitelist
|
||||
|
||||
choice = get_yn_input(REPORT_SKIPPED_SEGMENTS_PROMPT)
|
||||
config.skip_count_tracking = choice != "n"
|
||||
|
||||
choice = get_yn_input(MUTE_ADS_PROMPT)
|
||||
config.mute_ads = choice == "y"
|
||||
|
||||
choice = get_yn_input(SKIP_ADS_PROMPT)
|
||||
config.skip_ads = choice == "y"
|
||||
|
||||
choice = get_yn_input(AUTOPLAY_PROMPT)
|
||||
config.auto_play = choice != "n"
|
||||
|
||||
print("Config finished")
|
||||
config.save()
|
||||
loop.run_until_complete(web_session.close())
|
||||
|
||||
@@ -2,6 +2,7 @@ userAgent = "iSponsorBlockTV/0.1"
|
||||
SponsorBlock_service = "youtube"
|
||||
SponsorBlock_actiontype = "skip"
|
||||
|
||||
SponsorBlock_api = "https://sponsor.ajay.app/api/"
|
||||
Youtube_api = "https://www.googleapis.com/youtube/v3/"
|
||||
|
||||
skip_categories = (
|
||||
@@ -19,4 +20,5 @@ skip_categories = (
|
||||
|
||||
youtube_client_blacklist = ["TVHTML5_FOR_KIDS"]
|
||||
|
||||
|
||||
config_file_blacklist_keys = ["config_file", "data_dir"]
|
||||
|
||||
@@ -9,6 +9,7 @@ from appdirs import user_data_dir
|
||||
|
||||
from . import config_setup, main, setup_wizard
|
||||
from .constants import config_file_blacklist_keys
|
||||
from .service.service_helpers import service
|
||||
|
||||
|
||||
class Device:
|
||||
@@ -42,7 +43,6 @@ class Config:
|
||||
self.mute_ads = False
|
||||
self.skip_ads = False
|
||||
self.auto_play = True
|
||||
self.api_server = "https://sponsor.ajay.app"
|
||||
self.__load()
|
||||
|
||||
def validate(self):
|
||||
@@ -214,6 +214,8 @@ pyapp_group.add_command(
|
||||
if os.getenv("PYAPP"):
|
||||
cli.add_command(pyapp_group)
|
||||
|
||||
cli.add_command(service)
|
||||
|
||||
|
||||
def app_start():
|
||||
cli(obj={})
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
import os
|
||||
import plistlib
|
||||
|
||||
from . import config_setup
|
||||
|
||||
"""Not updated to V2 yet, should still work. Here be dragons"""
|
||||
default_plist = {
|
||||
"Label": "com.dmunozv04iSponsorBlockTV",
|
||||
"RunAtLoad": True,
|
||||
"StartInterval": 20,
|
||||
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
|
||||
"StandardErrorPath": "", # Fill later
|
||||
"StandardOutPath": "",
|
||||
"ProgramArguments": "",
|
||||
"WorkingDirectory": "",
|
||||
}
|
||||
|
||||
|
||||
def create_plist(path):
|
||||
plist = default_plist
|
||||
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"]
|
||||
plist["StandardErrorPath"] = path + "/iSponsorBlockTV.error.log"
|
||||
plist["StandardOutPath"] = path + "/iSponsorBlockTV.out.log"
|
||||
plist["WorkingDirectory"] = path
|
||||
launchd_path = os.path.expanduser("~/Library/LaunchAgents/")
|
||||
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist"
|
||||
|
||||
with open(path_to_save, "wb") as fp:
|
||||
plistlib.dump(plist, fp)
|
||||
|
||||
|
||||
def run_setup(file):
|
||||
config = {}
|
||||
config_setup.main(config, file, debug=False)
|
||||
|
||||
|
||||
def main():
|
||||
correct_path = os.path.expanduser("~/iSponsorBlockTV")
|
||||
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"):
|
||||
print("Program is on the right path")
|
||||
print("The launch daemon will now be installed")
|
||||
create_plist(correct_path)
|
||||
run_setup(correct_path + "/config.json")
|
||||
print(
|
||||
"Launch daemon installed. Please restart the computer to enable it or"
|
||||
" use:\n launchctl load"
|
||||
" ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
|
||||
)
|
||||
else:
|
||||
if not os.path.exists(correct_path):
|
||||
os.makedirs(correct_path)
|
||||
print(
|
||||
"Please move the program to the correct path: "
|
||||
+ correct_path
|
||||
+ "opening now on finder..."
|
||||
)
|
||||
os.system("open -R " + correct_path)
|
||||
0
src/iSponsorBlockTV/service/__init__.py
Normal file
0
src/iSponsorBlockTV/service/__init__.py
Normal file
76
src/iSponsorBlockTV/service/service_helpers.py
Normal file
76
src/iSponsorBlockTV/service/service_helpers.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import rich_click as click
|
||||
|
||||
from .service_managers import select_service_manager
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.pass_context
|
||||
def service(ctx):
|
||||
"""Manage the program as a service (executable only)"""
|
||||
ctx.ensure_object(dict)
|
||||
if os.getenv("PYAPP") is None:
|
||||
print(
|
||||
"Service commands are only available in the executable version of the"
|
||||
" program"
|
||||
)
|
||||
sys.exit(1)
|
||||
ctx.obj["service_manager"] = select_service_manager()(os.getenv("PYAPP"))
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def start(ctx):
|
||||
"""Start the service"""
|
||||
ctx.obj["service_manager"].start()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def stop(ctx):
|
||||
"""Stop the service"""
|
||||
ctx.obj["service_manager"].stop()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def restart(ctx):
|
||||
"""Restart the service"""
|
||||
ctx.obj["service_manager"].restart()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def status(ctx):
|
||||
"""Get the status of the service"""
|
||||
ctx.obj["service_manager"].status()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def install(ctx):
|
||||
"""Install the service"""
|
||||
ctx.obj["service_manager"].install()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def uninstall(ctx):
|
||||
"""Uninstall the service"""
|
||||
ctx.obj["service_manager"].uninstall()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def enable(ctx):
|
||||
"""Enable the service"""
|
||||
ctx.obj["service_manager"].enable()
|
||||
|
||||
|
||||
@service.command()
|
||||
@click.pass_context
|
||||
def disable(ctx):
|
||||
"""Disable the service"""
|
||||
ctx.obj["service_manager"].disable()
|
||||
107
src/iSponsorBlockTV/service/service_managers.py
Normal file
107
src/iSponsorBlockTV/service/service_managers.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import os
|
||||
import plistlib
|
||||
import subprocess
|
||||
from platform import system
|
||||
|
||||
from appdirs import user_log_dir
|
||||
|
||||
|
||||
def select_service_manager() -> "ServiceManager":
|
||||
platform = system()
|
||||
if platform == "Darwin":
|
||||
return Launchd
|
||||
elif platform == "Linux":
|
||||
return Systemd
|
||||
else:
|
||||
raise NotImplementedError("Unsupported platform")
|
||||
|
||||
|
||||
class ServiceManager:
|
||||
def __init__(self, executable_path, *args, **kwargs):
|
||||
self.executable_path = executable_path
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def restart(self):
|
||||
pass
|
||||
|
||||
def status(self):
|
||||
pass
|
||||
|
||||
def install(self):
|
||||
pass
|
||||
|
||||
def uninstall(self):
|
||||
pass
|
||||
|
||||
def enable(self):
|
||||
pass
|
||||
|
||||
def disable(self):
|
||||
pass
|
||||
|
||||
|
||||
class Launchd(ServiceManager):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.service_name = "com.dmunozv04.iSponsorBlockTV"
|
||||
self.service_path = (
|
||||
os.path.expanduser("~/Library/LaunchAgents/") + self.service_name + ".plist"
|
||||
)
|
||||
|
||||
def start(self):
|
||||
subprocess.run(["launchctl", "start", self.service_name])
|
||||
|
||||
def stop(self):
|
||||
subprocess.run(["launchctl", "stop", self.service_name])
|
||||
|
||||
def restart(self):
|
||||
subprocess.run(["launchctl", "restart", self.service_name])
|
||||
|
||||
def status(self):
|
||||
subprocess.run(["launchctl", "list", self.service_name])
|
||||
|
||||
def install(self):
|
||||
if os.path.exists(self.service_path):
|
||||
print("Service already installed")
|
||||
return
|
||||
logs_dir = user_log_dir("iSponsorBlockTV", "dmunozv04")
|
||||
# ensure the logs directory exists
|
||||
os.makedirs(logs_dir, exist_ok=True)
|
||||
plist = {
|
||||
"Label": "com.dmunozv04.iSponsorBlockTV",
|
||||
"RunAtLoad": True,
|
||||
"StartInterval": 20,
|
||||
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
|
||||
"StandardErrorPath": logs_dir + "/iSponsorBlockTV.err",
|
||||
"StandardOutPath": logs_dir + "/iSponsorBlockTV.out",
|
||||
"Program": self.executable_path,
|
||||
}
|
||||
with open(self.service_path, "wb") as fp:
|
||||
plistlib.dump(plist, fp)
|
||||
print("Service installed")
|
||||
self.enable()
|
||||
|
||||
def uninstall(self):
|
||||
self.disable()
|
||||
# Remove the file
|
||||
try:
|
||||
os.remove(self.service_path)
|
||||
print("Service uninstalled")
|
||||
except FileNotFoundError:
|
||||
print("Service not found")
|
||||
|
||||
def enable(self):
|
||||
subprocess.run(["launchctl", "load", self.service_path])
|
||||
|
||||
def disable(self):
|
||||
subprocess.run(["launchctl", "stop", self.service_name])
|
||||
subprocess.run(["launchctl", "unload", self.service_path])
|
||||
|
||||
|
||||
class Systemd(ServiceManager):
|
||||
pass
|
||||
@@ -876,31 +876,6 @@ class AutoPlayManager(Vertical):
|
||||
self.config.auto_play = event.checkbox.value
|
||||
|
||||
|
||||
class ApiServerManager(Vertical):
|
||||
"""Manager for the custom API server URL."""
|
||||
|
||||
def __init__(self, config, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.config = config
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Label("Custom API Server", classes="title")
|
||||
yield Label(
|
||||
"You can specify a custom SponsorBlock API server URL here.",
|
||||
classes="subtitle",
|
||||
)
|
||||
with Grid(id="api-server-grid"):
|
||||
yield Input(
|
||||
placeholder="Custom API Server URL",
|
||||
id="api-server-input",
|
||||
value=self.config.api_server,
|
||||
)
|
||||
|
||||
@on(Input.Changed, "#api-server-input")
|
||||
def changed_api_server(self, event: Input.Changed):
|
||||
self.config.api_server = event.input.value
|
||||
|
||||
|
||||
class ISponsorBlockTVSetupMainScreen(Screen):
|
||||
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
|
||||
|
||||
@@ -940,9 +915,6 @@ class ISponsorBlockTVSetupMainScreen(Screen):
|
||||
yield AutoPlayManager(
|
||||
config=self.config, id="autoplay-manager", classes="container"
|
||||
)
|
||||
yield ApiServerManager(
|
||||
config=self.config, id="api-server-manager", classes="container"
|
||||
)
|
||||
|
||||
def on_mount(self) -> None:
|
||||
if self.check_for_old_config_entries():
|
||||
|
||||
@@ -35,7 +35,6 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
self.mute_ads = config.mute_ads
|
||||
self.skip_ads = config.skip_ads
|
||||
self.auto_play = config.auto_play
|
||||
self._command_mutex = asyncio.Lock()
|
||||
|
||||
# Ensures that we still are subscribed to the lounge
|
||||
async def _watchdog(self):
|
||||
@@ -146,12 +145,9 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
self.shorts_disconnected = False
|
||||
create_task(self.play_video(video_id_saved))
|
||||
elif event_type == "loungeScreenDisconnected":
|
||||
if args: # Sometimes it's empty
|
||||
data = args[0]
|
||||
if (
|
||||
data["reason"] == "disconnectedByUserScreenInitiated"
|
||||
): # Short playing?
|
||||
self.shorts_disconnected = True
|
||||
data = args[0]
|
||||
if data["reason"] == "disconnectedByUserScreenInitiated": # Short playing?
|
||||
self.shorts_disconnected = True
|
||||
elif event_type == "onAutoplayModeChanged":
|
||||
create_task(self.set_auto_play_mode(self.auto_play))
|
||||
|
||||
@@ -185,9 +181,3 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
|
||||
async def play_video(self, video_id: str) -> bool:
|
||||
return await self._command("setPlaylist", {"videoId": video_id})
|
||||
|
||||
# Test to wrap the command function in a mutex to avoid race conditions with
|
||||
# the _command_offset (TODO: move to upstream if it works)
|
||||
async def _command(self, command: str, command_parameters: dict = None) -> bool:
|
||||
async with self._command_mutex:
|
||||
return await super()._command(command, command_parameters)
|
||||
|
||||
Reference in New Issue
Block a user