Compare commits

..

1 Commits

Author SHA1 Message Date
dmunozv04
93f4f0f752 Connect without playlist 2025-04-15 14:03:02 +02:00
17 changed files with 147 additions and 382 deletions

View File

@@ -6,8 +6,7 @@ on:
branches:
- '*'
tags:
- 'v*.*.*'
- 'v*.*.*-*'
- 'v*'
pull_request:
branches:
- '*'
@@ -28,7 +27,7 @@ jobs:
steps:
# Get the repository's code
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v4
# Generate docker tags
- name: Docker meta
@@ -41,9 +40,7 @@ jobs:
tags: |
type=raw,value=develop,priority=900,enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }}
type=ref,enable=true,priority=600,prefix=pr-,suffix=,event=pr
type=semver,pattern=v{{version}}
type=semver,pattern=v{{major}}
type=semver,pattern=v{{major}}.{{minor}}
type=ref,event=tag
type=ref,event=branch
type=schedule
@@ -56,7 +53,7 @@ jobs:
uses: docker/setup-buildx-action@v3
- name: Login to DockerHub
if: github.event_name != 'pull_request' && env.DOCKERHUB_USERNAME != ''
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
@@ -74,7 +71,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64, linux/arm64, linux/arm/v7, linux/386, linux/arm/v6
platforms: linux/amd64, linux/arm64, linux/arm/v7
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

View File

@@ -11,9 +11,12 @@ name: Release Package
on:
push:
branches:
- 'main'
- '*'
tags:
- 'v*'
pull_request:
branches:
- '*'
release:
types: [published]
@@ -30,10 +33,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v4
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@@ -46,7 +49,7 @@ jobs:
run: python -m hatch build
- name: Upload artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: sdist-and-wheel
path: dist/*
@@ -100,17 +103,17 @@ jobs:
CARGO_BUILD_TARGET: ${{ matrix.job.target }}
PYAPP_DISTRIBUTION_VARIANT_CPU: ${{ matrix.job.cpu_variant }}
PYAPP_REPO: pyapp # Use local copy of pyapp (needed for cross-compiling)
PYAPP_VERSION: v0.28.0
PYAPP_VERSION: v0.27.0
steps:
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v4
- name: Clone PyApp
run: git clone --depth 1 --branch $PYAPP_VERSION https://github.com/ofek/pyapp $PYAPP_REPO
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@@ -141,7 +144,7 @@ jobs:
hatch --version
- name: Get artifact
uses: actions/download-artifact@v6
uses: actions/download-artifact@v4
with:
name: sdist-and-wheel
path: ${{ github.workspace }}/dist
@@ -159,13 +162,12 @@ jobs:
mv dist/binary/iSponsorBlockTV* dist/binary/iSponsorBlockTV-${{ matrix.job.release_suffix }}
- name: Attest build provenance
uses: actions/attest-build-provenance@v3
uses: actions/attest-build-provenance@v2
with:
subject-path: dist/binary/*
continue-on-error: true # Continue if attestation fails (it will fail on forks)
- name: Upload built binary package
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: binaries-${{ matrix.job.release_suffix }}
path: dist/binary/*
@@ -181,7 +183,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Get artifact
uses: actions/download-artifact@v6
uses: actions/download-artifact@v4
with:
name: sdist-and-wheel
path: dist
@@ -200,7 +202,7 @@ jobs:
if: github.event_name == 'release' && github.event.action == 'published'
runs-on: ubuntu-latest
steps:
- uses: actions/download-artifact@v6
- uses: actions/download-artifact@v4
name: Get artifact
with:
path: dist

View File

@@ -18,11 +18,11 @@ jobs:
steps:
# Get the repository's code
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v4
# Update description
- name: Update repo description
uses: peter-evans/dockerhub-description@v5
uses: peter-evans/dockerhub-description@v4
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

View File

@@ -3,7 +3,7 @@
# Inspired by textual pre-commit config
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
rev: v5.0.0
hooks:
- id: check-ast # simply checks whether the files parse as valid python
- id: check-builtin-literals # requires literal syntax when initializing empty or zero python builtin types
@@ -19,13 +19,13 @@ repos:
- id: mixed-line-ending # replaces or checks mixed line ending
- id: trailing-whitespace # checks for trailing whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.12
rev: v0.11.4
hooks:
- id: ruff
args: [ --fix, --exit-non-zero-on-fix ]
- id: ruff-format
- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.45.0
rev: v0.44.0
hooks:
- id: markdownlint
args: ["--fix"]

View File

@@ -5,11 +5,10 @@
[![GitHub Release](https://img.shields.io/github/v/release/dmunozv04/isponsorblocktv?logo=GitHub&style=flat)](https://github.com/dmunozv04/iSponsorBlockTV/releases/latest)
[![GitHub Repo stars](https://img.shields.io/github/stars/dmunozv04/isponsorblocktv?style=flat)](https://github.com/dmunozv04/isponsorblocktv)
iSponsorBlockTV is a self-hosted application that connects to your YouTube TV
app (see compatibility below) and automatically skips segments (like Sponsors
or intros) in YouTube videos using the [SponsorBlock](https://sponsor.ajay.app/)
API. It can also auto mute and press the "Skip Ad" button the moment it becomes
available on YouTube ads.
Skip sponsor segments in YouTube videos playing on a YouTube TV device (see
below for compatibility details).
This project is written in asynchronous python and should be pretty quick.
## Installation
@@ -23,7 +22,7 @@ Open an issue/pull request if you have tested a device that isn't listed here.
| Device | Status |
|:-------------------|:------:|
| Apple TV | ✅* |
| Apple TV | ✅ |
| Samsung TV (Tizen) | ✅ |
| LG TV (WebOS) | ✅ |
| Android TV | ✅ |
@@ -36,22 +35,17 @@ Open an issue/pull request if you have tested a device that isn't listed here.
| Xbox One/Series | ✅ |
| Playstation 4/5 | ✅ |
*Ad muting won't work when using AirPlay to send the audio to another speaker.
** Shorts aren't fully supported due to limitations on YouTube's side.
A single short can be seen by either selecting the "Disconnect" option in the
warning shown
or by long pressing the thumbnail to open the menu and clicking play from there
## Usage
Run iSponsorBlockTV on a computer that has network access. It doesn't need to
be on the same network as the device, only access to youtube.com is required.
Run iSponsorBlockTV on a computer that has network access.
Auto discovery will require the computer to be on the same network as the device
during setup.
The device can also be manually added to iSponsorBlockTV with a YouTube TV code.
This code can be found in the settings page of your YouTube TV application.
This code can be found in the settings page of your YouTube application.
It connects to the device, watches its activity and skips any sponsor segment
using the [SponsorBlock](https://sponsor.ajay.app/) API.
It can also skip/mute YouTube ads.
## Libraries used
@@ -77,9 +71,11 @@ This code can be found in the settings page of your YouTube TV application.
## Contributors
[![Contributors](https://contrib.rocks/image?repo=dmunozv04/iSponsorBlockTV)](https://github.com/dmunozv04/iSponsorBlockTV/graphs/contributors)
Made with [contrib.rocks](https://contrib.rocks).
- [dmunozv04](https://github.com/dmunozv04) - creator and maintainer
- [HaltCatchFire](https://github.com/HaltCatchFire) - updated dependencies and
improved skip logic
- [Oxixes](https://github.com/oxixes) - added support for channel whitelist and
minor improvements
## License

View File

@@ -12,7 +12,6 @@
"skip_count_tracking": true,
"mute_ads": true,
"skip_ads": true,
"minimum_skip_length": 1,
"auto_play": true,
"join_name": "iSponsorBlockTV",
"apikey": "",
@@ -20,6 +19,5 @@
{"id": "",
"name": ""
}
],
"use_proxy": false
]
}

View File

@@ -1,7 +1,8 @@
version: '3.3'
services:
iSponsorBlockTV:
image: ghcr.io/dmunozv04/isponsorblocktv
container_name: iSponsorBlockTV
restart: unless-stopped
volumes:
- /PATH_TO_YOUR_DATA_DIR:/app/data
iSponsorBlockTV:
image: ghcr.io/dmunozv04/isponsorblocktv
container_name: iSponsorBlockTV
restart: unless-stopped
volumes:
- /PATH_TO_YOUR_DATA_DIR:/app/data

View File

@@ -1,6 +1,6 @@
[project]
name = "iSponsorBlockTV"
version = "2.6.1"
version = "2.4.0"
authors = [
{"name" = "dmunozv04"}
]

View File

@@ -1,10 +1,10 @@
aiohttp==3.12.15
aiohttp==3.11.16
appdirs==1.4.4
async-cache==1.1.1
pyytlounge==2.3.0
rich==14.1.0
ssdp==1.3.1
textual==5.3.0
rich==14.0.0
ssdp==1.3.0
textual==2.1.2
textual-slider==0.2.0
xmltodict==0.15.1
rich_click==1.8.9
xmltodict==0.14.2
rich_click==1.8.8

View File

@@ -27,7 +27,6 @@ class ApiHelper:
self.skip_count_tracking = config.skip_count_tracking
self.web_session = web_session
self.num_devices = len(config.devices)
self.minimum_skip_length = config.minimum_skip_length
# Not used anymore, maybe it can stay here a little longer
@AsyncLRU(maxsize=10)
@@ -140,10 +139,10 @@ class ApiHelper:
if str(i["videoID"]) == str(vid_id):
response_json = i
break
return self.process_segments(response_json, self.minimum_skip_length)
return self.process_segments(response_json)
@staticmethod
def process_segments(response, minimum_skip_length):
def process_segments(response):
segments = []
ignore_ttl = True
try:
@@ -185,9 +184,7 @@ class ApiHelper:
segment_dict["start"] = segment_before_start
segment_dict["UUID"].extend(segment_before_UUID)
segments.pop()
# Only add segments greater than minimum skip length
if segment_dict["end"] - segment_dict["start"] > minimum_skip_length:
segments.append(segment_dict)
segments.append(segment_dict)
except BaseException:
pass
return segments, ignore_ttl

View File

@@ -5,7 +5,6 @@ import aiohttp
from . import api_helpers, ytlounge
# Constants for user input prompts
USE_PROXY_PROMPT = "Do you want to use system-wide proxy? (y/N)"
ATVS_REMOVAL_PROMPT = (
"Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/N) "
)
@@ -25,10 +24,6 @@ SEARCH_CHANNEL_PROMPT = 'Enter a channel name or "/exit" to exit: '
SELECT_CHANNEL_PROMPT = "Select one option of the above [0-6]: "
ENTER_CHANNEL_ID_PROMPT = "Enter a channel ID: "
ENTER_CUSTOM_CHANNEL_NAME_PROMPT = "Enter the channel name: "
MINIMUM_SKIP_PROMPT = "Do you want to specify a minimum length of segment to skip? (y/N)"
MINIMUM_SKIP_SPECIFICATION_PROMPT = (
"Enter minimum length of segment to skip in seconds (enter 0 to disable):"
)
REPORT_SKIPPED_SEGMENTS_PROMPT = (
"Do you want to report skipped segments to sponsorblock. Only the segment"
" UUID will be sent? (Y/n) "
@@ -46,8 +41,8 @@ def get_yn_input(prompt):
return None
async def create_web_session(use_proxy):
return aiohttp.ClientSession(trust_env=use_proxy)
async def create_web_session():
return aiohttp.ClientSession()
async def pair_device(web_session: aiohttp.ClientSession):
@@ -76,12 +71,8 @@ async def pair_device(web_session: aiohttp.ClientSession):
def main(config, debug: bool) -> None:
print("Welcome to the iSponsorBlockTV cli setup wizard")
choice = get_yn_input(USE_PROXY_PROMPT)
config.use_proxy = choice == "y"
loop = asyncio.get_event_loop_policy().get_event_loop()
web_session = loop.run_until_complete(create_web_session(config.use_proxy))
web_session = loop.run_until_complete(create_web_session())
if debug:
loop.set_debug(True)
asyncio.set_event_loop(loop)
@@ -180,21 +171,6 @@ def main(config, debug: bool) -> None:
config.channel_whitelist = channel_whitelist
# Ask for minimum skip length. Confirm input is an integer
minimum_skip_length = config.minimum_skip_length
choice = get_yn_input(MINIMUM_SKIP_PROMPT)
if choice == "y":
while True:
try:
minimum_skip_length = int(input(MINIMUM_SKIP_SPECIFICATION_PROMPT))
break
except ValueError:
print("You entered a non integer value, try again.")
continue
config.minimum_skip_length = minimum_skip_length
choice = get_yn_input(REPORT_SKIPPED_SEGMENTS_PROMPT)
config.skip_count_tracking = choice != "n"

View File

@@ -1,25 +0,0 @@
class AiohttpTracer:
def __init__(self, logger):
self.logger = logger
async def on_request_start(self, session, context, params):
self.logger.debug(f"Request started ({id(context):#x}): {params.method} {params.url}")
async def on_request_end(self, session, context, params):
self.logger.debug(f"Request ended ({id(context):#x}): {params.response.status}")
async def on_request_exception(self, session, context, params):
self.logger.debug(f"Request exception ({id(context):#x}): {params.exception}")
async def on_response_chunk_received(self, session, context, params):
chunk_size = len(params.chunk)
try:
# Try to decode as text
text = params.chunk.decode("utf-8")
self.logger.debug(f"Response chunk ({id(context):#x}) {chunk_size} bytes: {text}")
except UnicodeDecodeError:
# If not valid UTF-8, show as hex
hex_data = params.chunk.hex()
self.logger.debug(
f"Response chunk ({id(context):#x}) ({chunk_size} bytes) [HEX]: {hex_data}"
)

View File

@@ -41,10 +41,8 @@ class Config:
self.skip_count_tracking = True
self.mute_ads = False
self.skip_ads = False
self.minimum_skip_length = 1
self.auto_play = True
self.join_name = "iSponsorBlockTV"
self.use_proxy = False
self.__load()
def validate(self):
@@ -132,7 +130,6 @@ class Config:
help="data directory",
)
@click.option("--debug", is_flag=True, help="debug mode")
@click.option("--http-tracing", is_flag=True, help="Enable HTTP request/response tracing")
# legacy commands as arguments
@click.option("--setup", is_flag=True, help="Setup the program graphically", hidden=True)
@click.option(
@@ -142,12 +139,11 @@ class Config:
hidden=True,
)
@click.pass_context
def cli(ctx, data, debug, http_tracing, setup, setup_cli):
def cli(ctx, data, debug, setup, setup_cli):
"""iSponsorblockTV"""
ctx.ensure_object(dict)
ctx.obj["data_dir"] = data
ctx.obj["debug"] = debug
ctx.obj["http_tracing"] = http_tracing
logger = logging.getLogger()
ctx.obj["logger"] = logger
@@ -192,7 +188,7 @@ def start(ctx):
"""Start the main program"""
config = Config(ctx.obj["data_dir"])
config.validate()
main.main(config, ctx.obj["debug"], ctx.obj["http_tracing"])
main.main(config, ctx.obj["debug"])
# Create fake "self" group to show pyapp options in help menu

View File

@@ -7,7 +7,6 @@ from typing import Optional
import aiohttp
from . import api_helpers, ytlounge
from .debug_helpers import AiohttpTracer
class DeviceListener:
@@ -154,30 +153,14 @@ def handle_signal(signum, frame):
raise KeyboardInterrupt()
async def main_async(config, debug, http_tracing):
async def main_async(config, debug):
loop = asyncio.get_event_loop_policy().get_event_loop()
tasks = [] # Save the tasks so the interpreter doesn't garbage collect them
devices = [] # Save the devices to close them later
if debug:
loop.set_debug(True)
tcp_connector = aiohttp.TCPConnector(ttl_dns_cache=300)
# Configure session with tracing if enabled
if http_tracing:
root_logger = logging.getLogger("aiohttp_trace")
tracer = AiohttpTracer(root_logger)
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(tracer.on_request_start)
trace_config.on_response_chunk_received.append(tracer.on_response_chunk_received)
trace_config.on_request_end.append(tracer.on_request_end)
trace_config.on_request_exception.append(tracer.on_request_exception)
web_session = aiohttp.ClientSession(
trust_env=config.use_proxy, connector=tcp_connector, trace_configs=[trace_config]
)
else:
web_session = aiohttp.ClientSession(trust_env=config.use_proxy, connector=tcp_connector)
web_session = aiohttp.ClientSession(connector=tcp_connector)
api_helper = api_helpers.ApiHelper(config, web_session)
for i in config.devices:
device = DeviceListener(api_helper, config, i, debug, web_session)
@@ -201,5 +184,5 @@ async def main_async(config, debug, http_tracing):
print("Exited")
def main(config, debug, http_tracing):
asyncio.run(main_async(config, debug, http_tracing))
def main(config, debug):
asyncio.run(main_async(config, debug))

View File

@@ -383,9 +383,3 @@ MigrationScreen {
padding: 1;
height: auto;
}
/* Use Proxy */
#useproxy-container{
padding: 1;
height: auto;
}

View File

@@ -13,7 +13,6 @@ from textual.containers import (
ScrollableContainer,
Vertical,
)
from textual.css.query import NoMatches
from textual.events import Click
from textual.screen import Screen
from textual.validation import Function
@@ -234,7 +233,7 @@ class AddDevice(ModalWithClickExit):
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
self.web_session = aiohttp.ClientSession(trust_env=config.use_proxy)
self.web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, self.web_session)
self.devices_discovered_dial = []
@@ -302,11 +301,7 @@ class AddDevice(ModalWithClickExit):
async def task_discover_devices(self):
devices_found = await self.api_helper.discover_youtube_devices_dial()
try:
list_widget: SelectionList = self.query_one("#dial-devices-list")
except NoMatches:
# The widget was not found, probably the screen was dismissed
return
list_widget: SelectionList = self.query_one("#dial-devices-list")
list_widget.clear_options()
if devices_found:
# print(devices_found)
@@ -341,7 +336,7 @@ class AddDevice(ModalWithClickExit):
pairing_code = int(
pairing_code.replace("-", "").replace(" ", "")
) # remove dashes and spaces
device_name = self.query_one("#device-name-input").value
device_name = self.parent.query_one("#device-name-input").value
paired = False
try:
paired = await lounge_controller.pair(pairing_code)
@@ -387,7 +382,7 @@ class AddChannel(ModalWithClickExit):
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
web_session = aiohttp.ClientSession(trust_env=config.use_proxy)
web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, web_session)
def compose(self) -> ComposeResult:
@@ -664,7 +659,7 @@ class ApiKeyManager(Vertical):
@on(Button.Pressed, "#api-key-view")
def pressed_api_key_view(self, event: Button.Pressed):
if "Show" in str(event.button.label):
if "Show" in event.button.label:
event.button.label = "Hide key"
self.query_one("#api-key-input").password = False
else:
@@ -697,43 +692,6 @@ class SkipCategoriesManager(Vertical):
self.config.skip_categories = event.selection_list.selected
class MinimumSkipLengthManager(Vertical):
"""Manager for minimum skip length setting."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Minimum Skip Length", classes="title")
yield Label(
(
"Specify the minimum length a segment must meet in order to skip "
"it (in seconds). Default is 1 second; entering 0 will skip all "
"segments."
),
classes="subtitle",
)
yield Input(
placeholder="Minimum skip length (0 to skip all)",
id="minimum-skip-length-input",
value=str(getattr(self.config, "minimum_skip_length", 1)),
validators=[
Function(
lambda user_input: user_input.isdigit(),
"Please enter a valid non-negative number",
)
],
)
@on(Input.Changed, "#minimum-skip-length-input")
def changed_minimum_skip_length(self, event: Input.Changed):
try:
self.config.minimum_skip_length = int(event.input.value)
except ValueError:
self.config.minimum_skip_length = 1
class SkipCountTrackingManager(Vertical):
"""Manager for skip count tracking, allows to enable/disable skip count tracking."""
@@ -825,7 +783,10 @@ class ChannelWhitelistManager(Vertical):
id="channel-whitelist-subtitle",
)
yield Label(
("⚠️ [#FF0000]You need to set your YouTube Api Key in order to use this feature"),
(
":warning: [#FF0000]You need to set your YouTube Api Key in order to"
" use this feature"
),
id="warning-no-key",
)
with Horizontal(id="add-channel-button-container"):
@@ -892,45 +853,11 @@ class AutoPlayManager(Vertical):
self.config.auto_play = event.checkbox.value
class UseProxyManager(Vertical):
"""Manager for proxy use, allows enabling/disabling use of proxy."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Use proxy", classes="title")
yield Label(
"This feature allows application to use system proxy,"
" if it is set in environment variables."
" This parameter will be passed in all [i]aiohttp.ClientSession[/i]"
' calls. For further information, see "[i]trust_env[/i]" section at'
" [link='https://docs.aiohttp.org/en/stable/client_reference.html']"
"aiohttp documentation[/link].",
classes="subtitle",
id="useproxy-subtitle",
)
with Horizontal(id="useproxy-container"):
yield Checkbox(
value=self.config.use_proxy,
id="useproxy-switch",
label="Use proxy",
)
@on(Checkbox.Changed, "#useproxy-switch")
def changed_skip(self, event: Checkbox.Changed):
self.config.use_proxy = event.checkbox.value
class ISponsorBlockTVSetup(App):
class ISponsorBlockTVSetupMainScreen(Screen):
TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard"
BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
AUTO_FOCUS = None
CSS_PATH = ( # tcss is the recommended extension for textual css files
"setup-wizard-style.tcss"
)
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
@@ -946,11 +873,6 @@ class ISponsorBlockTVSetup(App):
yield SkipCategoriesManager(
config=self.config, id="skip-categories-manager", classes="container"
)
yield MinimumSkipLengthManager(
config=self.config,
id="minimum-skip-length-manager",
classes="container",
)
yield SkipCountTrackingManager(
config=self.config, id="count-segments-manager", classes="container"
)
@@ -962,7 +884,6 @@ class ISponsorBlockTVSetup(App):
)
yield ApiKeyManager(config=self.config, id="api-key-manager", classes="container")
yield AutoPlayManager(config=self.config, id="autoplay-manager", classes="container")
yield UseProxyManager(config=self.config, id="useproxy-manager", classes="container")
def on_mount(self) -> None:
if self.check_for_old_config_entries():
@@ -986,13 +907,36 @@ class ISponsorBlockTVSetup(App):
@on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed):
try: # ChannelWhitelist might not be mounted
self.app.query_one("#warning-no-key").display = bool(
(not event.input.value) and self.config.channel_whitelist
)
except NoMatches:
# Show if no api key is set and at least one channel is in the whitelist
self.app.query_one("#warning-no-key").display = (
not event.input.value
) and self.config.channel_whitelist
except BaseException:
pass
class ISponsorBlockTVSetup(App):
CSS_PATH = ( # tcss is the recommended extension for textual css files
"setup-wizard-style.tcss"
)
# Bindings for the whole app here, so they are available in all screens
BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
self.main_screen = ISponsorBlockTVSetupMainScreen(config=self.config)
def on_mount(self) -> None:
self.push_screen(self.main_screen)
def action_save(self) -> None:
self.main_screen.action_save()
def action_exit_modal(self) -> None:
self.main_screen.action_exit_modal()
def main(config):
app = ISponsorBlockTVSetup(config)
app.run()

View File

@@ -1,16 +1,16 @@
import asyncio
import json
import sys
from typing import Any, List
import pyytlounge
from aiohttp import ClientSession
from pyytlounge.wrapper import NotLinkedException, api_base, as_aiter, Dict
from uuid import uuid4
from .constants import youtube_client_blacklist
from pyytlounge.api import api_base
from pyytlounge.exceptions import NotLinkedException
from pyytlounge.util import as_aiter
create_task = asyncio.create_task
@@ -34,8 +34,6 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.logger = logger
self.shorts_disconnected = False
self.auto_play = True
self.watchdog_running = False
self.last_event_time = 0
if config:
self.mute_ads = config.mute_ads
self.skip_ads = config.skip_ads
@@ -44,58 +42,21 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
# Ensures that we still are subscribed to the lounge
async def _watchdog(self):
"""
Continuous watchdog that monitors for connection health.
If no events are received within the expected timeframe,
it cancels the current subscription.
"""
self.watchdog_running = True
self.last_event_time = asyncio.get_event_loop().time()
await asyncio.sleep(
35
) # YouTube sends at least a message every 30 seconds (no-op or any other)
try:
while self.watchdog_running:
await asyncio.sleep(10)
current_time = asyncio.get_event_loop().time()
time_since_last_event = current_time - self.last_event_time
# YouTube sends a message at least every 30 seconds
if time_since_last_event > 60:
self.logger.debug(
f"Watchdog triggered: No events for {time_since_last_event:.1f} seconds"
)
# Cancel current subscription
if self.subscribe_task and not self.subscribe_task.done():
self.subscribe_task.cancel()
await asyncio.sleep(1) # Give it time to cancel
except asyncio.CancelledError:
self.logger.debug("Watchdog task cancelled")
self.watchdog_running = False
except BaseException as e:
self.logger.error(f"Watchdog error: {e}")
self.watchdog_running = False
self.subscribe_task.cancel()
except BaseException:
pass
# Subscribe to the lounge and start the watchdog
async def subscribe_monitored(self, callback):
self.callback = callback
# Stop existing watchdog if running
if self.subscribe_task_watchdog and not self.subscribe_task_watchdog.done():
self.watchdog_running = False
try:
self.subscribe_task_watchdog.cancel()
try:
await self.subscribe_task_watchdog
except (asyncio.CancelledError, Exception):
pass
# Start new subscription
if self.subscribe_task and not self.subscribe_task.done():
self.subscribe_task.cancel()
try:
await self.subscribe_task
except (asyncio.CancelledError, Exception):
pass
except BaseException:
pass # No watchdog task
self.subscribe_task = asyncio.create_task(super().subscribe(callback))
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
return self.subscribe_task
@@ -104,9 +65,13 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
# skipcq: PY-R1000
def _process_event(self, event_type: str, args: List[Any]):
self.logger.debug(f"process_event({event_type}, {args})")
# Update last event time for the watchdog
self.last_event_time = asyncio.get_event_loop().time()
# (Re)start the watchdog
try:
self.subscribe_task_watchdog.cancel()
except BaseException:
pass
finally:
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
# A bunch of events useful to detect ads playing,
# and the next video before it starts playing
# (that way we can get the segments)
@@ -124,7 +89,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
create_task(self.mute(False, override=True))
elif event_type == "onAdStateChange":
data = args[0]
if data["adState"] == "0" and data["currentTime"] != "0": # Ad is not playing
if data["adState"] == "0": # Ad is not playing
self.logger.info("Ad has ended, unmuting")
create_task(self.mute(False, override=True))
elif (
@@ -153,8 +118,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if vid_id := data["contentVideoId"]:
self.logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id))
if (
elif (
self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans
self.logger.info("Ad can be skipped, skipping")
@@ -241,57 +205,41 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
await self.conn.close()
self.session = web_session
def _common_connection_parameters(self) -> Dict[str, Any]:
return {
"name": self.device_name,
"loungeIdToken": self.auth.lounge_id_token,
"SID": self._sid,
"AID": self._last_event_id,
"gsessionid": self._gsession,
"device": "REMOTE_CONTROL",
"app": "ytios-phone-20.15.1",
"VER": "8",
"v": "2",
}
# TODO: Open a PR upstream to specify the connect_body.method
# if this works
async def connect(self) -> bool:
"""Attempt to connect using the previously set tokens"""
if not self.linked():
raise NotLinkedException("Not linked")
connect_body = {
"id": self.auth.screen_id,
"app": "web",
"mdx-version": "3",
"TYPE": "xmlhttp",
"theme": "cl",
"sessionSource": "MDX_SESSION_SOURCE_UNKNOWN",
"connectParams": '{"setStatesParams": "{"playbackSpeed":0}"}',
"RID": "1",
"CVER": "1",
"capabilities": "que,dsdtr,atp,vsp",
"ui": "false",
"app": "ytios-phone-20.15.1",
"pairing_type": "manual",
"VER": "8",
"loungeIdToken": self.auth.lounge_id_token,
"device": "REMOTE_CONTROL",
"name": self.device_name,
"id": self.auth.screen_id,
"device": "REMOTE_CONTROL",
"capabilities": "que,dsdtr,atp,vsp",
"method": "getNowPlaying",
"magnaKey": "cloudPairedDevice",
"ui": "false",
"deviceContext": "user_agent=dunno&window_width_points=&window_height_points=&os_name=android&ms=",
"theme": "cl",
"loungeIdToken": self.auth.lounge_id_token,
}
connect_url = f"{api_base}/bc/bind"
connect_url = (
f"{api_base}/bc/bind?RID=1&VER=8&CVER=1&auth_failure_option=send_error"
)
async with self.session.post(url=connect_url, data=connect_body) as resp:
try:
text = await resp.text()
if resp.status == 401:
if "Connection denied" in text:
self._logger.warning(
"Connection denied, attempting to circumvent the issue"
)
await self.connect_as_screen()
# self._lounge_token_expired()
self._lounge_token_expired()
return False
if resp.status != 200:
self._logger.warning("Unknown reply to connect %i %s", resp.status, resp.reason)
self._logger.warning(
"Unknown reply to connect %i %s", resp.status, resp.reason
)
return False
lines = text.splitlines()
async for events in self._parse_event_chunks(as_aiter(lines)):
@@ -304,46 +252,4 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
resp.status,
resp.reason,
)
raise
async def connect_as_screen(self) -> bool:
"""Attempt to connect using the previously set tokens"""
if not self.linked():
raise NotLinkedException("Not linked")
connect_body = {
"id": str(uuid4()),
"mdx-version": "3",
"TYPE": "xmlhttp",
"theme": "cl",
"sessionSource": "MDX_SESSION_SOURCE_UNKNOWN",
"connectParams": '{"setStatesParams": "{"playbackSpeed":0}"}',
"sessionNonce": str(uuid4()),
"RID": "1",
"CVER": "1",
"capabilities": "que,dsdtr,atp,vsp",
"ui": "false",
"app": "ytios-phone-20.15.1",
"pairing_type": "manual",
"VER": "8",
"loungeIdToken": self.auth.lounge_id_token,
"device": "LOUNGE_SCREEN",
"name": self.device_name,
}
connect_url = f"{api_base}/bc/bind"
async with self.session.post(url=connect_url, data=connect_body) as resp:
try:
await resp.text()
self.logger.error(
"Connected as screen: please force close the app on the device for iSponsorBlockTV to work properly"
)
self.logger.warn("Exiting in 5 seconds")
await asyncio.sleep(5)
sys.exit(0)
except:
self._logger.exception(
"Handle connect failed, status %s reason %s",
resp.status,
resp.reason,
)
raise
raise