Compare commits

...

52 Commits

Author SHA1 Message Date
dmunozv04
1ab7e73b52 bump version 2024-05-29 23:41:42 +02:00
David
d310e4c817 Merge pull request #160 from dmunozv04/update-dependencies
Update dependencies
2024-05-29 23:39:01 +02:00
pre-commit-ci[bot]
d21bb6320f [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-05-29 21:31:56 +00:00
dmunozv04
dd42e20dc4 Remove session closer, it seems to break the config setup 2024-05-05 19:16:27 +02:00
dmunozv04
213ae97bf2 Fix web_session 2024-05-05 19:07:03 +02:00
dmunozv04
582b9bf725 Patch the main aiohttp.ClientSession() into YTlounge 2024-04-27 19:26:55 +02:00
dmunozv04
ce95b6dbf0 Update dependencies 2024-04-27 19:17:52 +02:00
dmunozv04
80196b19aa Rework dockerfile and build for armv7 2024-03-12 22:40:23 +01:00
David
4dd6aa1c4d Merge pull request #140 from SShah7433/utilize_logging_standards
Update logging for standards
2024-03-07 09:14:40 +01:00
David
1a5f29fe2a Merge branch 'main' into utilize_logging_standards 2024-03-07 09:14:33 +01:00
David
13fe1f69ae Bump version 2024-02-14 22:07:51 +01:00
David
3b1ce5297f Merge pull request #136 from PetkoVasilev/main
fix for overlapping segments
2024-02-12 09:07:40 +01:00
Sidd Shah
e689a713ef remove unused code from prev logging implementation 2024-02-02 22:34:01 -05:00
Sidd Shah
e6b1e14d80 remove unused argument 2024-02-02 22:30:58 -05:00
pre-commit-ci[bot]
4934eff8b7 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-02-03 03:27:03 +00:00
Sidd Shah
674a13e43a utilize standard logging libraries and formats 2024-02-02 22:22:08 -05:00
David
152ba104a6 Merge branch 'main' into main 2024-01-31 11:47:32 +01:00
dmunozv04
265c56f3d6 bump version 2024-01-29 22:28:07 +01:00
David
7e954478f2 Merge branch 'main' into main 2024-01-29 21:47:07 +01:00
David
4f39f64ed0 Merge pull request #134 from bertybuttface/patch-5
Fix list_to_tuple
2024-01-29 21:46:48 +01:00
Petko Vasilev
8208a51176 more understandable logic (same result) 2024-01-29 12:39:22 +02:00
pre-commit-ci[bot]
ab6b67f88b [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-28 08:11:55 +00:00
Petko Vasilev
9068b58bf6 fix for overlapping segments 2024-01-28 09:59:47 +02:00
bertybuttface
a9f9a5b31c Update api_helpers.py
Fix list to tuple
2024-01-27 16:36:31 +00:00
David
a75dd83548 Merge pull request #132 from dmunozv04/open-file-utf-8
open and close config file with
2024-01-17 14:25:04 +01:00
dmunozv04
4e3c9d115c open and close config file with
utf-8 encoding
2024-01-15 21:53:08 +01:00
David
04533162cb Merge pull request #129 from dmunozv04/make-docker-image-smaller
Make docker image smaller by caching .pyc and removing .py files
2024-01-14 21:11:41 +01:00
pre-commit-ci[bot]
bee2a9c80f [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-14 15:39:02 +00:00
dmunozv04
f620ed2fcc Make docker image smaller by caching .pyc and removing .py files 2024-01-14 16:38:39 +01:00
David
db26bff3d2 Merge pull request #128 from dmunozv04/deepsource-autofix-c896b47c
refactor: refactor unnecessary `else` / `elif` when `if` block has a `continue` statement
2024-01-14 15:19:53 +01:00
David
d205848132 Merge pull request #127 from dmunozv04/deepsource-autofix-75d7989d
refactor: replace range(len(...)) with enumerate(...)
2024-01-14 15:19:22 +01:00
David
29445e678f Merge pull request #126 from dmunozv04/deepsource-autofix-097a200c
refactor: refactor unnecessary `else` / `elif` when `if` block has a `return` statement
2024-01-14 15:19:04 +01:00
deepsource-autofix[bot]
35453bc49e refactor: refactor unnecessary else / elif when if block has a continue statement
The use of `else` or `elif` becomes redundant and can be dropped if the last statement under the leading `if` / `elif` block is a `continue` statement.
In the case of an `elif` after `continue`, it can be written as a separate `if` block.
For `else` blocks after `continue`, the statements can be shifted out of `else`. Please refer to the examples below for reference.

Refactoring the code this way can improve code-readability and make it easier to maintain.
2024-01-14 14:18:37 +00:00
deepsource-autofix[bot]
88875a82d3 refactor: replace range(len(...)) with enumerate(...)
Using `range(len(...))` is not pythonic. Python does not have not index-based loops. Instead, it uses collection iterators.  Python has a built-in method `enumerate` which adds a counter to an iterable.
2024-01-14 14:18:20 +00:00
deepsource-autofix[bot]
446393b078 refactor: refactor unnecessary else / elif when if block has a return statement
The use of `else` or `elif` becomes redundant and can be dropped if the last statement under the leading `if` / `elif` block is a `return` statement.
In the case of an `elif` after `return`, it can be written as a separate `if` block.
For `else` blocks after `return`, the statements can be shifted out of `else`. Please refer to the examples below for reference.

Refactoring the code this way can improve code-readability and make it easier to maintain.
2024-01-14 14:18:05 +00:00
David
784d54c4e2 Merge pull request #125 from dmunozv04/add-pre-commit
add pre-commit
2024-01-14 15:12:02 +01:00
pre-commit-ci[bot]
66b39c4cac [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-14 14:10:15 +00:00
dmunozv04
321a9e6e9b reformat DIAL license 2024-01-14 15:09:30 +01:00
pre-commit-ci[bot]
ecb3583c35 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-14 14:03:37 +00:00
dmunozv04
3cee674e91 Merge remote-tracking branch 'origin/add-pre-commit' into add-pre-commit 2024-01-14 15:03:17 +01:00
dmunozv04
d575a296e7 setup black --preview 2024-01-14 15:03:01 +01:00
pre-commit-ci[bot]
7bf52b6df1 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-14 13:53:04 +00:00
dmunozv04
846ea444d2 Merge remote-tracking branch 'origin/add-pre-commit' into add-pre-commit 2024-01-14 14:52:56 +01:00
dmunozv04
1671d7841b set black for max line length 2024-01-14 14:52:38 +01:00
pre-commit-ci[bot]
a4f0b5fffe [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2024-01-14 13:47:14 +00:00
dmunozv04
dae27e7aa3 add pre-commit 2024-01-14 14:46:46 +01:00
David
5df90a234d Merge pull request #122 from dmunozv04/improve-logging-and-shorts-hack
Improve logging and shorts hack
2024-01-14 14:41:12 +01:00
dmunozv04
c3fd67df27 Clean code and fix #121 2023-12-29 16:19:44 +01:00
dmunozv04
35652b6247 Improve exit behaviour 2023-12-29 15:39:41 +01:00
dmunozv04
8ab9cf9519 Improve logging and add hack to view
one short
2023-12-26 19:34:24 +01:00
dmunozv04
f4fbbcdff5 Remove connect fix now that it's been pushed upstream 2023-12-19 08:24:45 +01:00
David
385ed8268c Update package version 2023-12-09 19:22:45 +01:00
24 changed files with 836 additions and 395 deletions

View File

@@ -67,9 +67,10 @@ jobs:
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
with: with:
context: . context: .
platforms: linux/amd64, linux/arm64 platforms: linux/amd64, linux/arm64, linux/arm/v7
push: ${{ github.event_name != 'pull_request' }} push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache
cache-to: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache,mode=max # Only cache if it's not a pull request
cache-to: ${{ github.event_name != 'pull_request' && 'type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache,mode=max' || '' }}

39
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,39 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
# Inspired by textual pre-commit config
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: check-ast # simply checks whether the files parse as valid python
- id: check-builtin-literals # requires literal syntax when initializing empty or zero python builtin types
- id: check-case-conflict # checks for files that would conflict in case-insensitive filesystems
- id: check-merge-conflict # checks for files that contain merge conflict strings
- id: check-json # checks json files for parseable syntax
- id: check-toml # checks toml files for parseable syntax
- id: check-yaml # checks yaml files for parseable syntax
args: [ '--unsafe' ] # Instead of loading the files, parse them for syntax.
- id: check-shebang-scripts-are-executable # ensures that (non-binary) files with a shebang are executable
- id: check-vcs-permalinks # ensures that links to vcs websites are permalinks
- id: end-of-file-fixer # ensures that a file is either empty, or ends with one newline
- id: mixed-line-ending # replaces or checks mixed line ending
- id: trailing-whitespace # checks for trailing whitespace
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
language_version: '3.11'
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
language_version: '3.11'
args: ["--preview"]
- repo: https://github.com/hadialqattan/pycln # removes unused imports
rev: v2.3.0
hooks:
- id: pycln
language_version: "3.11"
args: [--all]

View File

@@ -1,19 +1,38 @@
# syntax=docker/dockerfile:1 # syntax=docker/dockerfile:1
FROM python:3.11-alpine3.19 as BASE
FROM python:3.11-alpine FROM base as compiler
WORKDIR /app
COPY src .
RUN python3 -m compileall -b -f . && \
find . -name "*.py" -type f -delete
FROM base as DEP_INSTALLER
COPY requirements.txt .
RUN apk add --no-cache gcc musl-dev && \
pip install --upgrade pip wheel && \
pip install --user -r requirements.txt && \
pip uninstall -y pip wheel && \
apk del gcc musl-dev && \
python3 -m compileall -b -f /root/.local/lib/python3.11/site-packages && \
find /root/.local/lib/python3.11/site-packages -name "*.py" -type f -delete && \
find /root/.local/lib/python3.11/ -name "__pycache__" -type d -exec rm -rf {} +
FROM base
ENV PIP_NO_CACHE_DIR=off iSPBTV_docker=True iSPBTV_data_dir=data TERM=xterm-256color COLORTERM=truecolor ENV PIP_NO_CACHE_DIR=off iSPBTV_docker=True iSPBTV_data_dir=data TERM=xterm-256color COLORTERM=truecolor
COPY requirements.txt . COPY requirements.txt .
RUN pip install --upgrade pip wheel && \ COPY --from=dep_installer /root/.local /root/.local
pip install -r requirements.txt
WORKDIR /app WORKDIR /app
RUN python -m compileall COPY --from=compiler /app .
COPY src . ENTRYPOINT ["python3", "-u", "main.pyc"]
ENTRYPOINT ["python3", "-u", "main.py"]

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "iSponsorBlockTV" name = "iSponsorBlockTV"
version = "2.0.3" version = "2.0.7"
authors = [ authors = [
{"name" = "dmunozv04"} {"name" = "dmunozv04"}
] ]
@@ -28,3 +28,6 @@ files = ["requirements.txt"]
[build-system] [build-system]
requires = ["hatchling", "hatch-requirements-txt"] requires = ["hatchling", "hatch-requirements-txt"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[tool.black]
line-length = 88

View File

@@ -1,10 +1,10 @@
aiohttp==3.9.0 aiohttp==3.9.5
appdirs==1.4.4 appdirs==1.4.4
argparse==1.4.0 argparse==1.4.0
async-cache==1.1.1 async-cache==1.1.1
pyytlounge==1.6.3 pyytlounge==2.0.0
rich==13.6.0 rich==13.7.1
ssdp==1.3.0 ssdp==1.3.0
textual==0.40.0 textual==0.58.0
textual-slider==0.1.1 textual-slider==0.1.1
xmltodict==0.13.0 xmltodict==0.13.0

View File

@@ -1,5 +1,6 @@
from . import helpers from . import helpers
def main(): def main():
helpers.app_start() helpers.app_start()

View File

@@ -1,16 +1,18 @@
from cache import AsyncLRU
from .conditional_ttl_cache import AsyncConditionalTTL
from . import constants, dial_client
from hashlib import sha256
from aiohttp import ClientSession
import html import html
from hashlib import sha256
from aiohttp import ClientSession
from cache import AsyncLRU
from . import constants, dial_client
from .conditional_ttl_cache import AsyncConditionalTTL
def listToTuple(function): def list_to_tuple(function):
def wrapper(*args): def wrapper(*args):
args = [tuple(x) if type(x) == list else x for x in args] args = [tuple(x) if isinstance(x, list) else x for x in args]
result = function(*args) result = function(*args)
result = tuple(result) if type(result) == list else result result = tuple(result) if isinstance(result, list) else result
return result return result
return wrapper return wrapper
@@ -72,7 +74,13 @@ class ApiHelper:
@AsyncLRU(maxsize=10) @AsyncLRU(maxsize=10)
async def search_channels(self, channel): async def search_channels(self, channel):
channels = [] channels = []
params = {"q": channel, "key": self.apikey, "part": "snippet", "type": "channel", "maxResults": "5"} params = {
"q": channel,
"key": self.apikey,
"part": "snippet",
"type": "channel",
"maxResults": "5",
}
url = constants.Youtube_api + "search" url = constants.Youtube_api + "search"
async with self.web_session.get(url, params=params) as resp: async with self.web_session.get(url, params=params) as resp:
data = await resp.json() data = await resp.json()
@@ -80,30 +88,42 @@ class ApiHelper:
return channels return channels
for i in data["items"]: for i in data["items"]:
# Get channel subcription number # Get channel subscription number
params = {"id": i["snippet"]["channelId"], "key": self.apikey, "part": "statistics"} params = {
"id": i["snippet"]["channelId"],
"key": self.apikey,
"part": "statistics",
}
url = constants.Youtube_api + "channels" url = constants.Youtube_api + "channels"
async with self.web_session.get(url, params=params) as resp: async with self.web_session.get(url, params=params) as resp:
channelData = await resp.json() channel_data = await resp.json()
if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]: if channel_data["items"][0]["statistics"]["hiddenSubscriberCount"]:
subCount = "Hidden" sub_count = "Hidden"
else: else:
subCount = int(channelData["items"][0]["statistics"]["subscriberCount"]) sub_count = int(
subCount = format(subCount, "_") channel_data["items"][0]["statistics"]["subscriberCount"]
)
sub_count = format(sub_count, "_")
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount)) channels.append(
(i["snippet"]["channelId"], i["snippet"]["channelTitle"], sub_count)
)
return channels return channels
@listToTuple # Convert list to tuple so it can be used as a key in the cache @list_to_tuple # Convert list to tuple so it can be used as a key in the cache
@AsyncConditionalTTL(time_to_live=300, maxsize=10) # 5 minutes for non-locked segments @AsyncConditionalTTL(
time_to_live=300, maxsize=10
) # 5 minutes for non-locked segments
async def get_segments(self, vid_id): async def get_segments(self, vid_id):
if await self.is_whitelisted(vid_id): if await self.is_whitelisted(vid_id):
print("Video is whitelisted") return (
return ([], True) # Return empty list and True to indicate that the cache should last forever [],
True,
) # Return empty list and True to indicate that the cache should last forever
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[ vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4 :4
] # Hashes video id and gets the first 4 characters ] # Hashes video id and gets the first 4 characters
params = { params = {
"category": self.skip_categories, "category": self.skip_categories,
"actionType": constants.SponsorBlock_actiontype, "actionType": constants.SponsorBlock_actiontype,
@@ -111,14 +131,17 @@ class ApiHelper:
} }
headers = {"Accept": "application/json"} headers = {"Accept": "application/json"}
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with self.web_session.get(url, headers=headers, params=params) as response: async with self.web_session.get(
url, headers=headers, params=params
) as response:
response_json = await response.json() response_json = await response.json()
if response.status != 200: if response.status != 200:
response_text = await response.text() response_text = await response.text()
print( print(
f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. " f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}."
f"Code: {response.status} - {response_text}") f" Code: {response.status} - {response_text}"
return ([], True) )
return [], True
for i in response_json: for i in response_json:
if str(i["videoID"]) == str(vid_id): if str(i["videoID"]) == str(vid_id):
response_json = i response_json = i
@@ -130,8 +153,27 @@ class ApiHelper:
segments = [] segments = []
ignore_ttl = True ignore_ttl = True
try: try:
for i in response["segments"]: response_segments = response["segments"]
ignore_ttl = ignore_ttl and i["locked"] == 1 # If all segments are locked, ignore ttl # sort by end
response_segments.sort(key=lambda x: x["segment"][1])
# extend ends of overlapping segments to make one big segment
for i in response_segments:
for j in response_segments:
if j["segment"][0] <= i["segment"][1] <= j["segment"][1]:
i["segment"][1] = j["segment"][1]
# sort by start
response_segments.sort(key=lambda x: x["segment"][0])
# extend starts of overlapping segments to make one big segment
for i in reversed(response_segments):
for j in reversed(response_segments):
if j["segment"][0] <= i["segment"][0] <= j["segment"][1]:
i["segment"][0] = j["segment"][0]
for i in response_segments:
ignore_ttl = (
ignore_ttl and i["locked"] == 1
) # If all segments are locked, ignore ttl
segment = i["segment"] segment = i["segment"]
UUID = i["UUID"] UUID = i["UUID"]
segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]} segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]}
@@ -144,21 +186,21 @@ class ApiHelper:
except Exception: except Exception:
segment_before_end = -10 segment_before_end = -10
if ( if (
segment_dict["start"] - segment_before_end < 1 segment_dict["start"] - segment_before_end < 1
): # Less than 1 second appart, combine them and skip them together ): # Less than 1 second apart, combine them and skip them together
segment_dict["start"] = segment_before_start segment_dict["start"] = segment_before_start
segment_dict["UUID"].extend(segment_before_UUID) segment_dict["UUID"].extend(segment_before_UUID)
segments.pop() segments.pop()
segments.append(segment_dict) segments.append(segment_dict)
except Exception: except Exception:
pass pass
return (segments, ignore_ttl) return segments, ignore_ttl
async def mark_viewed_segments(self, UUID): async def mark_viewed_segments(self, uuids):
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. """Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled.
Lets the contributor know that someone skipped the segment (thanks)""" Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking: if self.skip_count_tracking:
for i in UUID: for i in uuids:
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": i} params = {"UUID": i}
await self.web_session.post(url, params=params) await self.web_session.post(url, params=params)

View File

@@ -1,3 +1,8 @@
import datetime
from cache.key import KEY
from cache.lru import LRU
"""MIT License """MIT License
Copyright (c) 2020 Rajat Singh Copyright (c) 2020 Rajat Singh
@@ -19,11 +24,7 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.""" SOFTWARE."""
'''Modified code from https://github.com/iamsinghrajat/async-cache''' """Modified code from https://github.com/iamsinghrajat/async-cache"""
from cache.key import KEY
from cache.lru import LRU
import datetime
class AsyncConditionalTTL: class AsyncConditionalTTL:
@@ -31,22 +32,20 @@ class AsyncConditionalTTL:
def __init__(self, time_to_live, maxsize): def __init__(self, time_to_live, maxsize):
super().__init__(maxsize=maxsize) super().__init__(maxsize=maxsize)
self.time_to_live = datetime.timedelta( self.time_to_live = (
seconds=time_to_live datetime.timedelta(seconds=time_to_live) if time_to_live else None
) if time_to_live else None )
self.maxsize = maxsize self.maxsize = maxsize
def __contains__(self, key): def __contains__(self, key):
if key not in self.keys(): if key not in self.keys():
return False return False
else: key_expiration = super().__getitem__(key)[1]
key_expiration = super().__getitem__(key)[1] if key_expiration and key_expiration < datetime.datetime.now():
if key_expiration and key_expiration < datetime.datetime.now(): del self[key]
del self[key] return False
return False return True
else:
return True
def __getitem__(self, key): def __getitem__(self, key):
value = super().__getitem__(key)[0] value = super().__getitem__(key)[0]
@@ -55,13 +54,13 @@ class AsyncConditionalTTL:
def __setitem__(self, key, value): def __setitem__(self, key, value):
value, ignore_ttl = value # unpack tuple value, ignore_ttl = value # unpack tuple
ttl_value = ( ttl_value = (
datetime.datetime.now() + self.time_to_live (datetime.datetime.now() + self.time_to_live)
) if (self.time_to_live and not ignore_ttl) else None # ignore ttl if ignore_ttl is True if (self.time_to_live and not ignore_ttl)
else None
) # ignore ttl if ignore_ttl is True
super().__setitem__(key, (value, ttl_value)) super().__setitem__(key, (value, ttl_value))
def __init__( def __init__(self, time_to_live=60, maxsize=1024, skip_args: int = 0):
self, time_to_live=60, maxsize=1024, skip_args: int = 0
):
""" """
:param time_to_live: Use time_to_live as None for non expiring cache :param time_to_live: Use time_to_live as None for non expiring cache
@@ -73,7 +72,7 @@ class AsyncConditionalTTL:
def __call__(self, func): def __call__(self, func):
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
key = KEY(args[self.skip_args:], kwargs) key = KEY(args[self.skip_args :], kwargs)
if key in self.ttl: if key in self.ttl:
val = self.ttl[key] val = self.ttl[key]
else: else:

View File

@@ -1,13 +1,21 @@
import asyncio import asyncio
import aiohttp import aiohttp
from . import api_helpers, ytlounge from . import api_helpers, ytlounge
async def pair_device(): async def pair_device(web_session):
try: try:
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") lounge_controller = ytlounge.YtLoungeApi(
pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ") "iSponsorBlockTV", web_session=web_session
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces )
pairing_code = input(
"Enter pairing code (found in Settings - Link with TV code): "
)
pairing_code = int(
pairing_code.replace("-", "").replace(" ", "")
) # remove dashes and spaces
print("Pairing...") print("Pairing...")
paired = await lounge_controller.pair(pairing_code) paired = await lounge_controller.pair(pairing_code)
if not paired: if not paired:
@@ -27,18 +35,27 @@ async def pair_device():
def main(config, debug: bool) -> None: def main(config, debug: bool) -> None:
print("Welcome to the iSponsorBlockTV cli setup wizard") print("Welcome to the iSponsorBlockTV cli setup wizard")
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
web_session = aiohttp.ClientSession()
if debug: if debug:
loop.set_debug(True) loop.set_debug(True)
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
if hasattr(config, "atvs"): if hasattr(config, "atvs"):
print( print(
"The atvs config option is deprecated and has stopped working. Please read this for more information on " "The atvs config option is deprecated and has stopped working. Please read"
"how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2") " this for more information on how to upgrade to V2:"
if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y": " \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
)
if (
input(
"Do you want to remove the legacy 'atvs' entry (the app won't start"
" with it present)? (y/n) "
)
== "y"
):
del config["atvs"] del config["atvs"]
devices = config.devices devices = config.devices
while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n": while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n":
task = loop.create_task(pair_device()) task = loop.create_task(pair_device(web_session))
loop.run_until_complete(task) loop.run_until_complete(task)
device = task.result() device = task.result()
if device: if device:
@@ -49,57 +66,73 @@ def main(config, debug: bool) -> None:
if apikey: if apikey:
if input("API key already specified. Change it? (y/n) ") == "y": if input("API key already specified. Change it? (y/n) ") == "y":
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
config["apikey"] = apikey
else: else:
if input("API key only needed for the channel whitelist function. Add it? (y/n) ") == "y": if (
input(
"API key only needed for the channel whitelist function. Add it? (y/n) "
)
== "y"
):
print( print(
"Get youtube apikey here: https://developers.google.com/youtube/registering_an_application" "Get youtube apikey here:"
" https://developers.google.com/youtube/registering_an_application"
) )
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
config["apikey"] = apikey
config.apikey = apikey config.apikey = apikey
skip_categories = config.skip_categories skip_categories = config.skip_categories
if skip_categories: if skip_categories:
if input("Skip categories already specified. Change them? (y/n) ") == "y": if input("Skip categories already specified. Change them? (y/n) ") == "y":
categories = input( categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access " "Enter skip categories (space or comma sepparated) Options: [sponsor"
"interaction poi_highlight intro outro preview filler music_offtopic]:\n" " selfpromo exclusive_access interaction poi_highlight intro outro"
" preview filler music_offtopic]:\n"
) )
skip_categories = categories.replace(",", " ").split(" ") skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings skip_categories = [
x for x in skip_categories if x != ""
] # Remove empty strings
else: else:
categories = input( categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, " "Enter skip categories (space or comma sepparated) Options: [sponsor,"
"interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n" " selfpromo, exclusive_access, interaction, poi_highlight, intro, outro,"
" preview, filler, music_offtopic:\n"
) )
skip_categories = categories.replace(",", " ").split(" ") skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings skip_categories = [
x for x in skip_categories if x != ""
] # Remove empty strings
config.skip_categories = skip_categories config.skip_categories = skip_categories
channel_whitelist = config.channel_whitelist channel_whitelist = config.channel_whitelist
if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y": if (
input("Do you want to whitelist any channels from being ad-blocked? (y/n) ")
== "y"
):
if not apikey: if not apikey:
print( print(
"WARNING: You need to specify an API key to use this function, otherwise the program will fail to " "WARNING: You need to specify an API key to use this function,"
"start.\nYou can add one by re-running this setup wizard.") " otherwise the program will fail to start.\nYou can add one by"
web_session = aiohttp.ClientSession() " re-running this setup wizard."
)
api_helper = api_helpers.ApiHelper(config, web_session) api_helper = api_helpers.ApiHelper(config, web_session)
while True: while True:
channel_info = {} channel_info = {}
channel = input("Enter a channel name or \"/exit\" to exit: ") channel = input('Enter a channel name or "/exit" to exit: ')
if channel == "/exit": if channel == "/exit":
break break
task = loop.create_task(api_helper.search_channels(channel, apikey, web_session)) task = loop.create_task(
api_helper.search_channels(channel, apikey, web_session)
)
loop.run_until_complete(task) loop.run_until_complete(task)
results = task.result() results = task.result()
if len(results) == 0: if len(results) == 0:
print("No channels found") print("No channels found")
continue continue
for i in range(len(results)): for i, item in enumerate(results):
print(f"{i}: {results[i][1]} - Subs: {results[i][2]}") print(f"{i}: {item[1]} - Subs: {item[2]}")
print("5: Enter a custom channel ID") print("5: Enter a custom channel ID")
print("6: Go back") print("6: Go back")
@@ -114,18 +147,23 @@ def main(config, debug: bool) -> None:
channel_info["name"] = input("Enter the channel name: ") channel_info["name"] = input("Enter the channel name: ")
channel_whitelist.append(channel_info) channel_whitelist.append(channel_info)
continue continue
elif choice == "6": if choice == "6":
continue continue
channel_info["id"] = results[int(choice)][0] channel_info["id"] = results[int(choice)][0]
channel_info["name"] = results[int(choice)][1] channel_info["name"] = results[int(choice)][1]
channel_whitelist.append(channel_info) channel_whitelist.append(channel_info)
# Close web session asynchronously # Close web session asynchronously
loop.run_until_complete(web_session.close())
config.channel_whitelist = channel_whitelist config.channel_whitelist = channel_whitelist
config.skip_count_tracking = not input( config.skip_count_tracking = (
"Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n" not input(
"Do you want to report skipped segments to sponsorblock. Only the segment"
" UUID will be sent? (y/n) "
)
== "n"
)
print("Config finished") print("Config finished")
config.save() config.save()
loop.run_until_complete(web_session.close())

View File

@@ -6,16 +6,16 @@ SponsorBlock_api = "https://sponsor.ajay.app/api/"
Youtube_api = "https://www.googleapis.com/youtube/v3/" Youtube_api = "https://www.googleapis.com/youtube/v3/"
skip_categories = ( skip_categories = (
('Sponsor', 'sponsor'), ("Sponsor", "sponsor"),
('Self Promotion', 'selfpromo'), ("Self Promotion", "selfpromo"),
('Intro', 'intro'), ("Intro", "intro"),
('Outro', 'outro'), ("Outro", "outro"),
('Music Offtopic', 'music_offtopic'), ("Music Offtopic", "music_offtopic"),
('Interaction', 'interaction'), ("Interaction", "interaction"),
('Exclusive Access', 'exclusive_access'), ("Exclusive Access", "exclusive_access"),
('POI Highlight', 'poi_highlight'), ("POI Highlight", "poi_highlight"),
('Preview', 'preview'), ("Preview", "preview"),
('Filler', 'filler'), ("Filler", "filler"),
) )
youtube_client_blacklist = ["TVHTML5_FOR_KIDS"] youtube_client_blacklist = ["TVHTML5_FOR_KIDS"]

View File

@@ -1,21 +1,28 @@
"""Send out a M-SEARCH request and listening for responses.""" """Send out an M-SEARCH request and listening for responses."""
import asyncio import asyncio
import socket import socket
import ssdp import ssdp
from ssdp import network
import xmltodict import xmltodict
from ssdp import network
'''Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), """Redistribution and use of the DIAL DIscovery And Launch protocol
with or without modification, are permitted provided that the following conditions are met: ● Redistributions of the specification (the “DIAL Specification”), with or without modification,
DIAL Specification must retain the above copyright notice, this list of conditions and the following disclaimer. ● are permitted provided that the following conditions are met: ●
Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright Redistributions of the DIAL Specification must retain the above copyright
notice, this list of conditions and the following disclaimer. ● Redistributions of implementations of the DIAL notice, this list of conditions and the following disclaimer. ●
Specification in binary form must include the above copyright notice. ● The DIAL mark, the NETFLIX mark and the names Redistributions of implementations of the DIAL Specification in source code
of contributors to the DIAL Specification may not be used to endorse or promote specifications, software, products, form must retain the above copyright notice, this list of conditions and the
or any other materials derived from the DIAL Specification without specific prior written permission. The DIAL mark following disclaimer. ● Redistributions of implementations of the DIAL
is owned by Netflix and information on licensing the DIAL mark is available at www.dial-multiscreen.org.''' Specification in binary form must include the above copyright notice. ● The
DIAL mark, the NETFLIX mark and the names of contributors to the DIAL
Specification may not be used to endorse or promote specifications, software,
products, or any other materials derived from the DIAL Specification without
specific prior written permission. The DIAL mark is owned by Netflix and
information on licensing the DIAL mark is available at
www.dial-multiscreen.org."""
''' """
MIT License MIT License
Copyright (c) 2018 Johannes Hoppe Copyright (c) 2018 Johannes Hoppe
@@ -36,8 +43,9 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.''' SOFTWARE."""
'''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py''' """Modified code from
https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py"""
def get_ip(): def get_ip():
@@ -45,17 +53,16 @@ def get_ip():
s.settimeout(0) s.settimeout(0)
try: try:
# doesn't even have to be reachable # doesn't even have to be reachable
s.connect(('10.254.254.254', 1)) s.connect(("10.254.254.254", 1))
IP = s.getsockname()[0] ip = s.getsockname()[0]
except Exception: except Exception:
IP = '127.0.0.1' ip = "127.0.0.1"
finally: finally:
s.close() s.close()
return IP return ip
class Handler(ssdp.aio.SSDP): class Handler(ssdp.aio.SSDP):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.devices = [] self.devices = []
@@ -107,7 +114,9 @@ async def discover(web_session):
family, addr = network.get_best_family(bind, network.PORT) family, addr = network.get_best_family(bind, network.PORT)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
ip_address = get_ip() ip_address = get_ip()
connect = loop.create_datagram_endpoint(handler, family=family, local_addr=(ip_address, None)) connect = loop.create_datagram_endpoint(
handler, family=family, local_addr=(ip_address, None)
)
transport, protocol = await connect transport, protocol = await connect
target = network.MULTICAST_ADDRESS_IPV4, network.PORT target = network.MULTICAST_ADDRESS_IPV4, network.PORT

View File

@@ -36,7 +36,7 @@ class Config:
self.devices = [] self.devices = []
self.apikey = "" self.apikey = ""
self.skip_categories = [] self.skip_categories = [] # These are the categories on the config file
self.channel_whitelist = [] self.channel_whitelist = []
self.skip_count_tracking = True self.skip_count_tracking = True
self.mute_ads = False self.mute_ads = False
@@ -46,8 +46,12 @@ class Config:
def validate(self): def validate(self):
if hasattr(self, "atvs"): if hasattr(self, "atvs"):
print( print(
"The atvs config option is deprecated and has stopped working. Please read this for more information " (
"on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2", "The atvs config option is deprecated and has stopped working."
" Please read this for more information "
"on how to upgrade to V2:"
" \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2"
),
) )
print("Exiting in 10 seconds...") print("Exiting in 10 seconds...")
time.sleep(10) time.sleep(10)
@@ -59,14 +63,16 @@ class Config:
sys.exit() sys.exit()
self.devices = [Device(i) for i in self.devices] self.devices = [Device(i) for i in self.devices]
if not self.apikey and self.channel_whitelist: if not self.apikey and self.channel_whitelist:
raise ValueError("No youtube API key found and channel whitelist is not empty") raise ValueError(
"No youtube API key found and channel whitelist is not empty"
)
if not self.skip_categories: if not self.skip_categories:
self.categories = ["sponsor"] self.skip_categories = ["sponsor"]
print("No categories found, using default: sponsor") print("No categories found, using default: sponsor")
def __load(self): def __load(self):
try: try:
with open(self.config_file, "r") as f: with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
for i in config: for i in config:
if i not in config_file_blacklist_keys: if i not in config_file_blacklist_keys:
@@ -79,11 +85,22 @@ class Config:
print("Creating data directory") print("Creating data directory")
os.makedirs(self.data_dir) os.makedirs(self.data_dir)
else: # Running in docker without mounting the data dir else: # Running in docker without mounting the data dir
print("Running in docker without mounting the data dir, check the wiki for more information: " print(
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker") "Running in docker without mounting the data dir, check the"
print("This image has recently been updated to v2, and requires changes.", " wiki for more information: "
"Please read this for more information on how to upgrade to V2:", "https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker"
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2") )
print(
(
"This image has recently been updated to v2, and requires"
" changes."
),
(
"Please read this for more information on how to upgrade"
" to V2:"
),
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2",
)
print("Exiting in 10 seconds...") print("Exiting in 10 seconds...")
time.sleep(10) time.sleep(10)
sys.exit() sys.exit()
@@ -91,7 +108,7 @@ class Config:
print("Blank config file created") print("Blank config file created")
def save(self): def save(self):
with open(self.config_file, "w") as f: with open(self.config_file, "w", encoding="utf-8") as f:
config_dict = self.__dict__ config_dict = self.__dict__
# Don't save the config file name # Don't save the config file name
config_file = self.config_file config_file = self.config_file
@@ -109,12 +126,23 @@ class Config:
def app_start(): def app_start():
#If env has a data dir use that, otherwise use the default # If env has a data dir use that, otherwise use the default
default_data_dir = os.getenv("iSPBTV_data_dir") or user_data_dir("iSponsorBlockTV", "dmunozv04") default_data_dir = os.getenv("iSPBTV_data_dir") or user_data_dir(
"iSponsorBlockTV", "dmunozv04"
)
parser = argparse.ArgumentParser(description="iSponsorblockTV") parser = argparse.ArgumentParser(description="iSponsorblockTV")
parser.add_argument("--data-dir", "-d", default=default_data_dir, help="data directory") parser.add_argument(
parser.add_argument("--setup", "-s", action="store_true", help="setup the program graphically") "--data-dir", "-d", default=default_data_dir, help="data directory"
parser.add_argument("--setup-cli", "-sc", action="store_true", help="setup the program in the command line") )
parser.add_argument(
"--setup", "-s", action="store_true", help="setup the program graphically"
)
parser.add_argument(
"--setup-cli",
"-sc",
action="store_true",
help="setup the program in the command line",
)
parser.add_argument("--debug", action="store_true", help="debug mode") parser.add_argument("--debug", action="store_true", help="debug mode")
args = parser.parse_args() args = parser.parse_args()

View File

@@ -1,6 +1,8 @@
import plistlib
import os import os
import plistlib
from . import config_setup from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons""" """Not updated to V2 yet, should still work. Here be dragons"""
default_plist = { default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV", "Label": "com.dmunozv04iSponsorBlockTV",
@@ -40,7 +42,9 @@ def main():
create_plist(correct_path) create_plist(correct_path)
run_setup(correct_path + "/config.json") run_setup(correct_path + "/config.json")
print( print(
"Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist" "Launch daemon installed. Please restart the computer to enable it or"
" use:\n launchctl load"
" ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
) )
else: else:
if not os.path.exists(correct_path): if not os.path.exists(correct_path):
@@ -48,6 +52,6 @@ def main():
print( print(
"Please move the program to the correct path: " "Please move the program to the correct path: "
+ correct_path + correct_path
+ "opeing now on finder..." + "opening now on finder..."
) )
os.system("open -R " + correct_path) os.system("open -R " + correct_path)

View File

@@ -1,19 +1,36 @@
import asyncio import asyncio
import aiohttp
import time
import logging import logging
import time
from signal import SIGINT, SIGTERM, signal
from typing import Optional
import aiohttp
from . import api_helpers, ytlounge from . import api_helpers, ytlounge
class DeviceListener: class DeviceListener:
def __init__(self, api_helper, config, device): def __init__(self, api_helper, config, device, debug: bool, web_session):
self.task: asyncio.Task = None self.task: Optional[asyncio.Task] = None
self.api_helper = api_helper self.api_helper = api_helper
self.lounge_controller = ytlounge.YtLoungeApi(
device.screen_id, config, api_helper)
self.offset = device.offset self.offset = device.offset
self.name = device.name self.name = device.name
self.cancelled = False self.cancelled = False
self.logger = logging.getLogger(f"iSponsorBlockTV-{device.screen_id}")
self.web_session = web_session
if debug:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.INFO)
sh = logging.StreamHandler()
sh.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
self.logger.addHandler(sh)
self.logger.info(f"Starting device")
self.lounge_controller = ytlounge.YtLoungeApi(
device.screen_id, config, api_helper, self.logger, self.web_session
)
# Ensures that we have a valid auth token # Ensures that we have a valid auth token
async def refresh_auth_loop(self): async def refresh_auth_loop(self):
@@ -37,9 +54,9 @@ class DeviceListener:
lounge_controller = self.lounge_controller lounge_controller = self.lounge_controller
while not lounge_controller.linked(): while not lounge_controller.linked():
try: try:
self.logger.debug("Refreshing auth")
await lounge_controller.refresh_auth() await lounge_controller.refresh_auth()
except: except:
# traceback.print_exc()
await asyncio.sleep(10) await asyncio.sleep(10)
while not self.cancelled: while not self.cancelled:
while not (await self.is_available()) and not self.cancelled: while not (await self.is_available()) and not self.cancelled:
@@ -55,18 +72,18 @@ class DeviceListener:
await lounge_controller.connect() await lounge_controller.connect()
except: except:
pass pass
print(f"Connected to device {lounge_controller.screen_name} ({self.name})") self.logger.info(
"Connected to device %s (%s)", lounge_controller.screen_name, self.name
)
try: try:
# print("Subscribing to lounge") self.logger.info("Subscribing to lounge")
sub = await lounge_controller.subscribe_monitored(self) sub = await lounge_controller.subscribe_monitored(self)
await sub await sub
await asyncio.sleep(10)
except: except:
pass pass
# Method called on playback state change # Method called on playback state change
async def __call__(self, state): async def __call__(self, state):
logging.debug("Playstatus update" + str(state))
try: try:
self.task.cancel() self.task.cancel()
except: except:
@@ -80,7 +97,9 @@ class DeviceListener:
if state.videoId: if state.videoId:
segments = await self.api_helper.get_segments(state.videoId) segments = await self.api_helper.get_segments(state.videoId)
if state.state.value == 1: # Playing if state.state.value == 1: # Playing
print(f"Playing {state.videoId} with {len(segments)} segments") self.logger.info(
f"Playing video {state.videoId} with {len(segments)} segments"
)
if segments: # If there are segments if segments: # If there are segments
await self.time_to_segment(segments, state.currentTime, time_start) await self.time_to_segment(segments, state.currentTime, time_start)
@@ -89,27 +108,28 @@ class DeviceListener:
start_next_segment = None start_next_segment = None
next_segment = None next_segment = None
for segment in segments: for segment in segments:
if position < 2 and ( if position < 2 and (segment["start"] <= position < segment["end"]):
segment["start"] <= position < segment["end"]
):
next_segment = segment next_segment = segment
start_next_segment = position # different variable so segment doesn't change start_next_segment = (
position # different variable so segment doesn't change
)
break break
if segment["start"] > position: if segment["start"] > position:
next_segment = segment next_segment = segment
start_next_segment = next_segment["start"] start_next_segment = next_segment["start"]
break break
if start_next_segment: if start_next_segment:
time_to_next = start_next_segment - position - (time.time() - time_start) - self.offset time_to_next = (
start_next_segment - position - (time.time() - time_start) - self.offset
)
await self.skip(time_to_next, next_segment["end"], next_segment["UUID"]) await self.skip(time_to_next, next_segment["end"], next_segment["UUID"])
# Skips to the next segment (waits for the time to pass) # Skips to the next segment (waits for the time to pass)
async def skip(self, time_to, position, UUID): async def skip(self, time_to, position, uuids):
await asyncio.sleep(time_to) await asyncio.sleep(time_to)
await asyncio.gather( self.logger.info("Skipping segment: seeking to %s", position)
self.lounge_controller.seek_to(position), await asyncio.create_task(self.api_helper.mark_viewed_segments(uuids))
self.api_helper.mark_viewed_segments(UUID) await asyncio.create_task(self.lounge_controller.seek_to(position))
)
# Stops the connection to the device # Stops the connection to the device
async def cancel(self): async def cancel(self):
@@ -136,14 +156,15 @@ def main(config, debug):
web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector) web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector)
api_helper = api_helpers.ApiHelper(config, web_session) api_helper = api_helpers.ApiHelper(config, web_session)
for i in config.devices: for i in config.devices:
device = DeviceListener(api_helper, config, i) device = DeviceListener(api_helper, config, i, debug, web_session)
devices.append(device) devices.append(device)
tasks.append(loop.create_task(device.loop())) tasks.append(loop.create_task(device.loop()))
tasks.append(loop.create_task(device.refresh_auth_loop())) tasks.append(loop.create_task(device.refresh_auth_loop()))
try: signal(SIGINT, lambda s, f: loop.stop())
loop.run_forever() signal(SIGTERM, lambda s, f: loop.stop())
except KeyboardInterrupt: loop.run_forever()
print("Keyboard interrupt detected, cancelling tasks and exiting...") print("Cancelling tasks and exiting...")
loop.run_until_complete(finish(devices)) loop.run_until_complete(finish(devices))
finally: loop.run_until_complete(web_session.close())
loop.run_until_complete(web_session.close()) loop.run_until_complete(tcp_connector.close())
loop.close()

View File

@@ -2,15 +2,33 @@ import asyncio
import copy import copy
import aiohttp import aiohttp
# Textual imports (Textual is awesome!) # Textual imports (Textual is awesome!)
from textual import on from textual import on
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.containers import ScrollableContainer, Grid, Container, Vertical, Horizontal from textual.containers import (
Container,
Grid,
Horizontal,
ScrollableContainer,
Vertical,
)
from textual.events import Click from textual.events import Click
from textual.screen import Screen from textual.screen import Screen
from textual.validation import Function from textual.validation import Function
from textual.widgets import Button, Footer, Header, Static, Label, Input, SelectionList, Checkbox, ContentSwitcher, \ from textual.widgets import (
RadioSet, RadioButton Button,
Checkbox,
ContentSwitcher,
Footer,
Header,
Input,
Label,
RadioButton,
RadioSet,
SelectionList,
Static,
)
from textual.widgets.selection_list import Selection from textual.widgets.selection_list import Selection
from textual_slider import Slider from textual_slider import Slider
@@ -30,7 +48,9 @@ def _validate_pairing_code(pairing_code: str) -> bool:
class ModalWithClickExit(Screen): class ModalWithClickExit(Screen):
"""A modal screen that exits when clicked outside its bounds. """A modal screen that exits when clicked outside its bounds.
https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604""" https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604
"""
DEFAULT_CSS = """ DEFAULT_CSS = """
ModalWithClickExit { ModalWithClickExit {
align: center middle; align: center middle;
@@ -62,8 +82,15 @@ class Element(Static):
pass pass
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Button(label=self.element_name, classes="element-name", disabled=True, id="element-name") yield Button(
yield Button("Remove", classes="element-remove", variant="error", id="element-remove") label=self.element_name,
classes="element-name",
disabled=True,
id="element-name",
)
yield Button(
"Remove", classes="element-remove", variant="error", id="element-remove"
)
def on_mount(self) -> None: def on_mount(self) -> None:
if self.tooltip: if self.tooltip:
@@ -75,12 +102,15 @@ class Device(Element):
"""A device element.""" """A device element."""
def process_values_from_data(self): def process_values_from_data(self):
print("HIIII")
print(self.element_data) print(self.element_data)
if "name" in self.element_data and self.element_data["name"]: if "name" in self.element_data and self.element_data["name"]:
self.element_name = self.element_data["name"] self.element_name = self.element_data["name"]
else: else:
self.element_name = f"Unnamed device with id {self.element_data['screen_id'][:5]}...{self.element_data['screen_id'][-5:]}" self.element_name = (
"Unnamed device with id "
f"{self.element_data['screen_id'][:5]}..."
f"{self.element_data['screen_id'][-5:]}"
)
class Channel(Element): class Channel(Element):
@@ -90,7 +120,9 @@ class Channel(Element):
if "name" in self.element_data: if "name" in self.element_data:
self.element_name = self.element_data["name"] self.element_name = self.element_data["name"]
else: else:
self.element_name = f"Unnamed channel with id {self.element_data['channel_id']}" self.element_name = (
f"Unnamed channel with id {self.element_data['channel_id']}"
)
class ChannelRadio(RadioButton): class ChannelRadio(RadioButton):
@@ -104,18 +136,37 @@ class ChannelRadio(RadioButton):
class MigrationScreen(ModalWithClickExit): class MigrationScreen(ModalWithClickExit):
"""Screen with a dialog to remove old ATVS config.""" """Screen with a dialog to remove old ATVS config."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "remove_and_save", "Remove and save"), BINDINGS = [
("q,ctrl+c", "exit", "Exit")] ("escape", "dismiss()", "Cancel"),
("s", "remove_and_save", "Remove and save"),
("q,ctrl+c", "exit", "Exit"),
]
AUTO_FOCUS = "#exit-save" AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Grid( yield Grid(
Label( Label(
"Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, do you want to remove it?\n(The app won't start with it present)", (
id="question", classes="button-100"), "Welcome to the new configurator! You seem to have the legacy"
Button("Remove and save", variant="primary", id="migrate-remove-save", classes="button-100"), " 'atvs' entry on your config file, do you want to remove it?\n(The"
Button("Don't remove", variant="error", id="migrate-no-change", classes="button-100"), " app won't start with it present)"
),
id="question",
classes="button-100",
),
Button(
"Remove and save",
variant="primary",
id="migrate-remove-save",
classes="button-100",
),
Button(
"Don't remove",
variant="error",
id="migrate-no-change",
classes="button-100",
),
id="dialog-migration", id="dialog-migration",
) )
@@ -135,16 +186,25 @@ class MigrationScreen(ModalWithClickExit):
class ExitScreen(ModalWithClickExit): class ExitScreen(ModalWithClickExit):
"""Screen with a dialog to exit.""" """Screen with a dialog to exit."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "save", "Save"), BINDINGS = [
("q,ctrl+c", "exit", "Exit")] ("escape", "dismiss()", "Cancel"),
("s", "save", "Save"),
("q,ctrl+c", "exit", "Exit"),
]
AUTO_FOCUS = "#exit-save" AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Grid( yield Grid(
Label("Are you sure you want to exit without saving?", id="question", classes="button-100"), Label(
"Are you sure you want to exit without saving?",
id="question",
classes="button-100",
),
Button("Save", variant="success", id="exit-save", classes="button-100"), Button("Save", variant="success", id="exit-save", classes="button-100"),
Button("Don't save", variant="error", id="exit-no-save", classes="button-100"), Button(
"Don't save", variant="error", id="exit-no-save", classes="button-100"
),
Button("Cancel", variant="primary", id="exit-cancel", classes="button-100"), Button("Cancel", variant="primary", id="exit-cancel", classes="button-100"),
id="dialog-exit", id="dialog-exit",
) )
@@ -168,39 +228,78 @@ class ExitScreen(ModalWithClickExit):
class AddDevice(ModalWithClickExit): class AddDevice(ModalWithClickExit):
"""Screen with a dialog to add a device, either with a pairing code or with lan discovery.""" """Screen with a dialog to add a device, either with a pairing code or with lan discovery."""
BINDINGS = [("escape", "dismiss({})", "Return")] BINDINGS = [("escape", "dismiss({})", "Return")]
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
web_session = aiohttp.ClientSession() self.web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, web_session) self.api_helper = api_helpers.ApiHelper(config, self.web_session)
self.devices_discovered_dial = [] self.devices_discovered_dial = []
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
with Container(id="add-device-container"): with Container(id="add-device-container"):
yield Label("Add Device", classes="title") yield Label("Add Device", classes="title")
with Grid(id="add-device-switch-buttons"): with Grid(id="add-device-switch-buttons"):
yield Button("Add with pairing code", id="add-device-pin-button", classes="button-switcher") yield Button(
yield Button("Add with lan discovery", id="add-device-dial-button", classes="button-switcher") "Add with pairing code",
with ContentSwitcher(id="add-device-switcher", initial="add-device-pin-container"): id="add-device-pin-button",
classes="button-switcher",
)
yield Button(
"Add with lan discovery",
id="add-device-dial-button",
classes="button-switcher",
)
with ContentSwitcher(
id="add-device-switcher", initial="add-device-pin-container"
):
with Container(id="add-device-pin-container"): with Container(id="add-device-pin-container"):
yield Input(placeholder="Pairing Code (found in Settings - Link with TV code)", yield Input(
id="pairing-code-input", placeholder=(
validators=[ "Pairing Code (found in Settings - Link with TV code)"
Function(_validate_pairing_code, "Invalid pairing code format") ),
] id="pairing-code-input",
) validators=[
yield Input(placeholder="Device Name (auto filled if empty/optional)", id="device-name-input") Function(
yield Button("Add", id="add-device-pin-add-button", variant="success", disabled=True) _validate_pairing_code, "Invalid pairing code format"
)
],
)
yield Input(
placeholder="Device Name (auto filled if empty/optional)",
id="device-name-input",
)
yield Button(
"Add",
id="add-device-pin-add-button",
variant="success",
disabled=True,
)
yield Label(id="add-device-info") yield Label(id="add-device-info")
with Container(id="add-device-dial-container"): with Container(id="add-device-dial-container"):
yield Label( yield Label(
"Make sure your device is on the same network as this computer\nIf it isn't showing up, try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh the list, close and open the dialog again", (
classes="subtitle") "Make sure your device is on the same network as this"
yield SelectionList(("Searching for devices...", "", False), id="dial-devices-list", disabled=True) " computer\nIf it isn't showing up, try restarting the"
yield Button("Add selected devices", id="add-device-dial-add-button", variant="success", " app.\nIf running in docker, make sure to use"
disabled=True) " `--network=host`\nTo refresh the list, close and open the"
" dialog again"
),
classes="subtitle",
)
yield SelectionList(
("Searching for devices...", "", False),
id="dial-devices-list",
disabled=True,
)
yield Button(
"Add selected devices",
id="add-device-dial-add-button",
variant="success",
disabled=True,
)
async def on_mount(self) -> None: async def on_mount(self) -> None:
self.devices_discovered_dial = [] self.devices_discovered_dial = []
@@ -223,20 +322,27 @@ class AddDevice(ModalWithClickExit):
@on(Button.Pressed, "#add-device-switch-buttons > *") @on(Button.Pressed, "#add-device-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None: def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id self.query_one("#add-device-switcher").current = event.button.id.replace(
self.query_one("#add-device-switcher").current = event.button.id.replace("-button", "-container") "-button", "-container"
)
@on(Input.Changed, "#pairing-code-input") @on(Input.Changed, "#pairing-code-input")
def changed_pairing_code(self, event: Input.Changed): def changed_pairing_code(self, event: Input.Changed):
self.query_one("#add-device-pin-add-button").disabled = not event.validation_result.is_valid self.query_one("#add-device-pin-add-button").disabled = (
not event.validation_result.is_valid
)
@on(Input.Submitted, "#pairing-code-input") @on(Input.Submitted, "#pairing-code-input")
@on(Button.Pressed, "#add-device-pin-add-button") @on(Button.Pressed, "#add-device-pin-add-button")
async def handle_add_device_pin(self) -> None: async def handle_add_device_pin(self) -> None:
self.query_one("#add-device-pin-add-button").disabled = True self.query_one("#add-device-pin-add-button").disabled = True
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") lounge_controller = ytlounge.YtLoungeApi(
"iSponsorBlockTV", web_session=self.web_session
)
pairing_code = self.query_one("#pairing-code-input").value pairing_code = self.query_one("#pairing-code-input").value
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces pairing_code = int(
pairing_code.replace("-", "").replace(" ", "")
) # remove dashes and spaces
device_name = self.parent.query_one("#device-name-input").value device_name = self.parent.query_one("#device-name-input").value
paired = False paired = False
try: try:
@@ -251,7 +357,9 @@ class AddDevice(ModalWithClickExit):
} }
self.query_one("#pairing-code-input").value = "" self.query_one("#pairing-code-input").value = ""
self.query_one("#device-name-input").value = "" self.query_one("#device-name-input").value = ""
self.query_one("#add-device-info").update(f"[#00ff00][b]Successfully added {device['name']}") self.query_one("#add-device-info").update(
f"[#00ff00][b]Successfully added {device['name']}"
)
self.dismiss([device]) self.dismiss([device])
else: else:
self.query_one("#pairing-code-input").value = "" self.query_one("#pairing-code-input").value = ""
@@ -269,11 +377,14 @@ class AddDevice(ModalWithClickExit):
@on(SelectionList.SelectedChanged, "#dial-devices-list") @on(SelectionList.SelectedChanged, "#dial-devices-list")
def changed_device_list(self, event: SelectionList.SelectedChanged): def changed_device_list(self, event: SelectionList.SelectedChanged):
self.query_one("#add-device-dial-add-button").disabled = not event.selection_list.selected self.query_one("#add-device-dial-add-button").disabled = (
not event.selection_list.selected
)
class AddChannel(ModalWithClickExit): class AddChannel(ModalWithClickExit):
"""Screen with a dialog to add a channel, either using search or with a channel id.""" """Screen with a dialog to add a channel, either using search or with a channel id."""
BINDINGS = [("escape", "dismiss(())", "Return")] BINDINGS = [("escape", "dismiss(())", "Return")]
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
@@ -286,32 +397,78 @@ class AddChannel(ModalWithClickExit):
with Container(id="add-channel-container"): with Container(id="add-channel-container"):
yield Label("Add Channel", classes="title") yield Label("Add Channel", classes="title")
yield Label( yield Label(
"Select a method to add a channel. Adding via search only works if a YouTube api key has been set", (
id="add-channel-label", classes="subtitle") "Select a method to add a channel. Adding via search only works if"
" a YouTube api key has been set"
),
id="add-channel-label",
classes="subtitle",
)
with Grid(id="add-channel-switch-buttons"): with Grid(id="add-channel-switch-buttons"):
yield Button("Add by channel name", id="add-channel-search-button", classes="button-switcher") yield Button(
yield Button("Add by channel ID", id="add-channel-id-button", classes="button-switcher") "Add by channel name",
id="add-channel-search-button",
classes="button-switcher",
)
yield Button(
"Add by channel ID",
id="add-channel-id-button",
classes="button-switcher",
)
yield Label(id="add-channel-info", classes="subtitle") yield Label(id="add-channel-info", classes="subtitle")
with ContentSwitcher(id="add-channel-switcher", initial="add-channel-search-container"): with ContentSwitcher(
id="add-channel-switcher", initial="add-channel-search-container"
):
with Vertical(id="add-channel-search-container"): with Vertical(id="add-channel-search-container"):
if self.config.apikey: if self.config.apikey:
with Grid(id="add-channel-search-inputs"): with Grid(id="add-channel-search-inputs"):
yield Input(placeholder="Enter channel name", id="channel-name-input-search") yield Input(
yield Button("Search", id="search-channel-button", variant="success") placeholder="Enter channel name",
yield RadioSet(RadioButton(label="Search to see results", disabled=True), id="channel-name-input-search",
id="channel-search-results") )
yield Button("Add", id="add-channel-button-search", variant="success", disabled=True, yield Button(
classes="button-100") "Search", id="search-channel-button", variant="success"
)
yield RadioSet(
RadioButton(label="Search to see results", disabled=True),
id="channel-search-results",
)
yield Button(
"Add",
id="add-channel-button-search",
variant="success",
disabled=True,
classes="button-100",
)
else: else:
yield Label( yield Label(
"[#ff0000]No api key set, cannot search for channels. You can add it the config section below", (
id="add-channel-search-no-key", classes="subtitle") "[#ff0000]No api key set, cannot search for channels."
" You can add it the config section below"
),
id="add-channel-search-no-key",
classes="subtitle",
)
with Vertical(id="add-channel-id-container"): with Vertical(id="add-channel-id-container"):
yield Input(placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)", yield Input(
id="channel-id-input") placeholder=(
yield Input(placeholder="Enter channel name (only used to display in the config file)", "Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)"
id="channel-name-input-id") ),
yield Button("Add", id="add-channel-button-id", variant="success", classes="button-100") id="channel-id-input",
)
yield Input(
placeholder=(
"Enter channel name (only used to display in the config"
" file)"
),
id="channel-name-input-id",
)
yield Button(
"Add",
id="add-channel-button-id",
variant="success",
classes="button-100",
)
@on(RadioSet.Changed, "#channel-search-results") @on(RadioSet.Changed, "#channel-search-results")
def handle_radio_set_changed(self, event: RadioSet.Changed) -> None: def handle_radio_set_changed(self, event: RadioSet.Changed) -> None:
@@ -320,14 +477,18 @@ class AddChannel(ModalWithClickExit):
@on(Button.Pressed, "#add-channel-switch-buttons > *") @on(Button.Pressed, "#add-channel-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None: def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id button_ = event.button.id
self.query_one("#add-channel-switcher").current = event.button.id.replace("-button", "-container") self.query_one("#add-channel-switcher").current = event.button.id.replace(
"-button", "-container"
)
@on(Button.Pressed, "#search-channel-button") @on(Button.Pressed, "#search-channel-button")
@on(Input.Submitted, "#channel-name-input-search") @on(Input.Submitted, "#channel-name-input-search")
async def handle_search_channel(self) -> None: async def handle_search_channel(self) -> None:
channel_name = self.query_one("#channel-name-input-search").value channel_name = self.query_one("#channel-name-input-search").value
if not channel_name: if not channel_name:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel name") self.query_one("#add-channel-info").update(
"[#ff0000]Please enter a channel name"
)
return return
self.query_one("#search-channel-button").disabled = True self.query_one("#search-channel-button").disabled = True
self.query_one("#add-channel-info").update("Searching...") self.query_one("#add-channel-info").update("Searching...")
@@ -336,7 +497,9 @@ class AddChannel(ModalWithClickExit):
try: try:
channels_list = await self.api_helper.search_channels(channel_name) channels_list = await self.api_helper.search_channels(channel_name)
except: except:
self.query_one("#add-channel-info").update("[#ff0000]Failed to search for channel") self.query_one("#add-channel-info").update(
"[#ff0000]Failed to search for channel"
)
self.query_one("#search-channel-button").disabled = False self.query_one("#search-channel-button").disabled = False
return return
for i in channels_list: for i in channels_list:
@@ -349,7 +512,9 @@ class AddChannel(ModalWithClickExit):
def handle_add_channel_search(self) -> None: def handle_add_channel_search(self) -> None:
channel = self.query_one("#channel-search-results").pressed_button.channel_data channel = self.query_one("#channel-search-results").pressed_button.channel_data
if not channel: if not channel:
self.query_one("#add-channel-info").update("[#ff0000]Please select a channel") self.query_one("#add-channel-info").update(
"[#ff0000]Please select a channel"
)
return return
self.query_one("#add-channel-info").update("Adding...") self.query_one("#add-channel-info").update("Adding...")
self.dismiss(channel) self.dismiss(channel)
@@ -361,7 +526,9 @@ class AddChannel(ModalWithClickExit):
channel_id = self.query_one("#channel-id-input").value channel_id = self.query_one("#channel-id-input").value
channel_name = self.query_one("#channel-name-input-id").value channel_name = self.query_one("#channel-name-input-id").value
if not channel_id: if not channel_id:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel ID") self.query_one("#add-channel-info").update(
"[#ff0000]Please enter a channel ID"
)
return return
if not channel_name: if not channel_name:
channel_name = channel_id channel_name = channel_id
@@ -372,6 +539,7 @@ class AddChannel(ModalWithClickExit):
class EditDevice(ModalWithClickExit): class EditDevice(ModalWithClickExit):
"""Screen with a dialog to edit a device. Used by the DevicesManager.""" """Screen with a dialog to edit a device. Used by the DevicesManager."""
BINDINGS = [("escape", "close_screen_saving", "Return")] BINDINGS = [("escape", "close_screen_saving", "Return")]
def __init__(self, device: Element, **kwargs) -> None: def __init__(self, device: Element, **kwargs) -> None:
@@ -397,18 +565,29 @@ class EditDevice(ModalWithClickExit):
yield Input(placeholder="Device name", id="device-name-input", value=name) yield Input(placeholder="Device name", id="device-name-input", value=name)
yield Label("Device screen id") yield Label("Device screen id")
with Grid(id="device-id-container"): with Grid(id="device-id-container"):
yield Input(placeholder="Device id", id="device-id-input", value=self.device_data["screen_id"], yield Input(
password=True) placeholder="Device id",
id="device-id-input",
value=self.device_data["screen_id"],
password=True,
)
yield Button("Show id", id="device-id-view") yield Button("Show id", id="device-id-view")
yield Label("Device offset (in milliseconds)") yield Label("Device offset (in milliseconds)")
with Horizontal(id="device-offset-container"): with Horizontal(id="device-offset-container"):
yield Input(id="device-offset-input", value=str(offset)) yield Input(id="device-offset-input", value=str(offset))
yield Slider(name="Device offset", id="device-offset-slider", min=0, max=2000, step=100, value=offset) yield Slider(
name="Device offset",
id="device-offset-slider",
min=0,
max=2000,
step=100,
value=offset,
)
def on_slider_changed(self, event: Slider.Changed) -> None: def on_slider_changed(self, event: Slider.Changed) -> None:
input = self.query_one("#device-offset-input") offset_input = self.query_one("#device-offset-offset_input")
with input.prevent(Input.Changed): with offset_input.prevent(Input.Changed):
input.value = str(event.slider.value) offset_input.value = str(event.slider.value)
def on_input_changed(self, event: Input.Changed): def on_input_changed(self, event: Input.Changed):
if event.input.id == "device-offset-input": if event.input.id == "device-offset-input":
@@ -430,7 +609,8 @@ class EditDevice(ModalWithClickExit):
class DevicesManager(Vertical): class DevicesManager(Vertical):
"""Manager for devices, allows to add, edit and remove devices.""" """Manager for devices, allows adding, edit and removing devices."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -452,7 +632,8 @@ class DevicesManager(Vertical):
self.mount(device_widget) self.mount(device_widget)
device_widget.focus(scroll_visible=True) device_widget.focus(scroll_visible=True)
def edit_device(self, device_widget: Element) -> None: @staticmethod
def edit_device(device_widget: Element) -> None:
device_widget.process_values_from_data() device_widget.process_values_from_data()
device_widget.query_one("#element-name").label = device_widget.element_name device_widget.query_one("#element-name").label = device_widget.element_name
@@ -474,6 +655,7 @@ class DevicesManager(Vertical):
class ApiKeyManager(Vertical): class ApiKeyManager(Vertical):
"""Manager for the YouTube Api Key.""" """Manager for the YouTube Api Key."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -481,18 +663,23 @@ class ApiKeyManager(Vertical):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Label("YouTube Api Key", classes="title") yield Label("YouTube Api Key", classes="title")
yield Label( yield Label(
"You can get a YouTube Data API v3 Key from the [link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]. This key is only required if you're whitelisting channels.") "You can get a YouTube Data API v3 Key from the"
" [link=https://console.developers.google.com/apis/credentials]Google Cloud"
" Console[/link]. This key is only required if you're whitelisting"
" channels."
)
with Grid(id="api-key-grid"): with Grid(id="api-key-grid"):
yield Input(placeholder="YouTube Api Key", id="api-key-input", password=True, value=self.config.apikey) yield Input(
placeholder="YouTube Api Key",
id="api-key-input",
password=True,
value=self.config.apikey,
)
yield Button("Show key", id="api-key-view") yield Button("Show key", id="api-key-view")
@on(Input.Changed, "#api-key-input") @on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed): def changed_api_key(self, event: Input.Changed):
self.config.apikey = event.input.value self.config.apikey = event.input.value
# try: # ChannelWhitelist might not be mounted
# self.app.query_one("#warning-no-key").display = not self.config.apikey
# except:
# pass
@on(Button.Pressed, "#api-key-view") @on(Button.Pressed, "#api-key-view")
def pressed_api_key_view(self, event: Button.Pressed): def pressed_api_key_view(self, event: Button.Pressed):
@@ -505,7 +692,8 @@ class ApiKeyManager(Vertical):
class SkipCategoriesManager(Vertical): class SkipCategoriesManager(Vertical):
"""Manager for skip categories, allows to select which categories to skip.""" """Manager for skip categories, allows selecting which categories to skip."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -530,6 +718,7 @@ class SkipCategoriesManager(Vertical):
class SkipCountTrackingManager(Vertical): class SkipCountTrackingManager(Vertical):
"""Manager for skip count tracking, allows to enable/disable skip count tracking.""" """Manager for skip count tracking, allows to enable/disable skip count tracking."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -537,10 +726,22 @@ class SkipCountTrackingManager(Vertical):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Label("Skip count tracking", classes="title") yield Label("Skip count tracking", classes="title")
yield Label( yield Label(
"This feature tracks which segments you have skipped to let users know how much their submission has helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. The program sends a message to the sponsor block server each time you skip a segment. Hopefully most people don't change this setting so that the view numbers are accurate. :)", (
classes="subtitle", id="skip-count-tracking-subtitle") "This feature tracks which segments you have skipped to let users know"
yield Checkbox(value=self.config.skip_count_tracking, id="skip-count-tracking-switch", " how much their submission has helped others and used as a metric"
label="Enable skip count tracking") " along with upvotes to ensure that spam doesn't get into the database."
" The program sends a message to the sponsor block server each time you"
" skip a segment. Hopefully most people don't change this setting so"
" that the view numbers are accurate. :)"
),
classes="subtitle",
id="skip-count-tracking-subtitle",
)
yield Checkbox(
value=self.config.skip_count_tracking,
id="skip-count-tracking-switch",
label="Enable skip count tracking",
)
@on(Checkbox.Changed, "#skip-count-tracking-switch") @on(Checkbox.Changed, "#skip-count-tracking-switch")
def changed_skip_tracking(self, event: Checkbox.Changed): def changed_skip_tracking(self, event: Checkbox.Changed):
@@ -549,6 +750,7 @@ class SkipCountTrackingManager(Vertical):
class AdSkipMuteManager(Vertical): class AdSkipMuteManager(Vertical):
"""Manager for ad skip/mute, allows to enable/disable ad skip/mute.""" """Manager for ad skip/mute, allows to enable/disable ad skip/mute."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -556,13 +758,25 @@ class AdSkipMuteManager(Vertical):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Label("Skip/Mute ads", classes="title") yield Label("Skip/Mute ads", classes="title")
yield Label( yield Label(
"This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.", (
classes="subtitle", id="skip-count-tracking-subtitle") "This feature allows you to automatically mute and/or skip native"
" YouTube ads. Skipping ads only works if that ad shows the 'Skip Ad'"
" button, if it doesn't then it will only be able to be muted."
),
classes="subtitle",
id="skip-count-tracking-subtitle",
)
with Horizontal(id="ad-skip-mute-container"): with Horizontal(id="ad-skip-mute-container"):
yield Checkbox(value=self.config.skip_ads, id="skip-ads-switch", yield Checkbox(
label="Enable skipping ads") value=self.config.skip_ads,
yield Checkbox(value=self.config.mute_ads, id="mute-ads-switch", id="skip-ads-switch",
label="Enable muting ads") label="Enable skipping ads",
)
yield Checkbox(
value=self.config.mute_ads,
id="mute-ads-switch",
label="Enable muting ads",
)
@on(Checkbox.Changed, "#mute-ads-switch") @on(Checkbox.Changed, "#mute-ads-switch")
def changed_mute(self, event: Checkbox.Changed): def changed_mute(self, event: Checkbox.Changed):
@@ -574,7 +788,8 @@ class AdSkipMuteManager(Vertical):
class ChannelWhitelistManager(Vertical): class ChannelWhitelistManager(Vertical):
"""Manager for channel whitelist, allows to add/remove channels from the whitelist.""" """Manager for channel whitelist, allows adding/removing channels from the whitelist."""
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
@@ -582,17 +797,31 @@ class ChannelWhitelistManager(Vertical):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Label("Channel Whitelist", classes="title") yield Label("Channel Whitelist", classes="title")
yield Label( yield Label(
"This feature allows to whitelist channels from being skipped. This feature is automatically disabled when no channels have been specified.", (
classes="subtitle", id="channel-whitelist-subtitle") "This feature allows to whitelist channels from being skipped. This"
yield Label(":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature", " feature is automatically disabled when no channels have been"
id="warning-no-key") " specified."
),
classes="subtitle",
id="channel-whitelist-subtitle",
)
yield Label(
(
":warning: [#FF0000]You need to set your YouTube Api Key in order to"
" use this feature"
),
id="warning-no-key",
)
with Horizontal(id="add-channel-button-container"): with Horizontal(id="add-channel-button-container"):
yield Button("Add Channel", id="add-channel", classes="button-100") yield Button("Add Channel", id="add-channel", classes="button-100")
for channel in self.config.channel_whitelist: for channel in self.config.channel_whitelist:
yield Channel(channel) yield Channel(channel)
def on_mount(self) -> None: def on_mount(self) -> None:
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(self.config.channel_whitelist) self.app.query_one("#warning-no-key").display = (
not self.config.apikey
) and bool(self.config.channel_whitelist)
def new_channel(self, channel: tuple) -> None: def new_channel(self, channel: tuple) -> None:
if channel: if channel:
channel_dict = { channel_dict = {
@@ -603,30 +832,30 @@ class ChannelWhitelistManager(Vertical):
channel_widget = Channel(channel_dict) channel_widget = Channel(channel_dict)
self.mount(channel_widget) self.mount(channel_widget)
channel_widget.focus(scroll_visible=True) channel_widget.focus(scroll_visible=True)
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool( self.app.query_one("#warning-no-key").display = (
self.config.channel_whitelist) not self.config.apikey
) and bool(self.config.channel_whitelist)
@on(Button.Pressed, "#element-remove") @on(Button.Pressed, "#element-remove")
def remove_channel(self, event: Button.Pressed): def remove_channel(self, event: Button.Pressed):
channel_to_remove: Element = event.button.parent channel_to_remove: Element = event.button.parent
self.config.channel_whitelist.remove(channel_to_remove.element_data) self.config.channel_whitelist.remove(channel_to_remove.element_data)
channel_to_remove.remove() channel_to_remove.remove()
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool( self.app.query_one("#warning-no-key").display = (
self.config.channel_whitelist) not self.config.apikey
) and bool(self.config.channel_whitelist)
@on(Button.Pressed, "#add-channel") @on(Button.Pressed, "#add-channel")
def add_channel(self, event: Button.Pressed): def add_channel(self, event: Button.Pressed):
self.app.push_screen(AddChannel(self.config), callback=self.new_channel) self.app.push_screen(AddChannel(self.config), callback=self.new_channel)
class iSponsorBlockTVSetupMainScreen(Screen): class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221""" """Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
TITLE = "iSponsorBlockTV" TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard" SUB_TITLE = "Setup Wizard"
BINDINGS = [ BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
AUTO_FOCUS = None AUTO_FOCUS = None
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
@@ -639,12 +868,24 @@ class iSponsorBlockTVSetupMainScreen(Screen):
yield Header() yield Header()
yield Footer() yield Footer()
with ScrollableContainer(id="setup-wizard"): with ScrollableContainer(id="setup-wizard"):
yield DevicesManager(config=self.config, id="devices-manager", classes="container") yield DevicesManager(
yield SkipCategoriesManager(config=self.config, id="skip-categories-manager", classes="container") config=self.config, id="devices-manager", classes="container"
yield SkipCountTrackingManager(config=self.config, id="count-segments-manager", classes="container") )
yield AdSkipMuteManager(config=self.config, id="ad-skip-mute-manager", classes="container") yield SkipCategoriesManager(
yield ChannelWhitelistManager(config=self.config, id="channel-whitelist-manager", classes="container") config=self.config, id="skip-categories-manager", classes="container"
yield ApiKeyManager(config=self.config, id="api-key-manager", classes="container") )
yield SkipCountTrackingManager(
config=self.config, id="count-segments-manager", classes="container"
)
yield AdSkipMuteManager(
config=self.config, id="ad-skip-mute-manager", classes="container"
)
yield ChannelWhitelistManager(
config=self.config, id="channel-whitelist-manager", classes="container"
)
yield ApiKeyManager(
config=self.config, id="api-key-manager", classes="container"
)
def on_mount(self) -> None: def on_mount(self) -> None:
if self.check_for_old_config_entries(): if self.check_for_old_config_entries():
@@ -668,25 +909,26 @@ class iSponsorBlockTVSetupMainScreen(Screen):
@on(Input.Changed, "#api-key-input") @on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed): def changed_api_key(self, event: Input.Changed):
print("HIIII")
try: # ChannelWhitelist might not be mounted try: # ChannelWhitelist might not be mounted
# Show if no api key is set and at least one channel is in the whitelist # Show if no api key is set and at least one channel is in the whitelist
self.app.query_one("#warning-no-key").display = (not event.input.value) and self.config.channel_whitelist self.app.query_one("#warning-no-key").display = (
not event.input.value
) and self.config.channel_whitelist
except: except:
pass pass
class iSponsorBlockTVSetup(App):
CSS_PATH = "setup-wizard-style.tcss" # tcss is the recommended extension for textual css files class ISponsorBlockTVSetup(App):
CSS_PATH = ( # tcss is the recommended extension for textual css files
"setup-wizard-style.tcss"
)
# Bindings for the whole app here, so they are available in all screens # Bindings for the whole app here, so they are available in all screens
BINDINGS = [ BINDINGS = [("q,ctrl+c", "exit_modal", "Exit"), ("s", "save", "Save")]
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
def __init__(self, config, **kwargs) -> None: def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
self.config = config self.config = config
self.main_screen = iSponsorBlockTVSetupMainScreen(config=self.config) self.main_screen = ISponsorBlockTVSetupMainScreen(config=self.config)
def on_mount(self) -> None: def on_mount(self) -> None:
self.push_screen(self.main_screen) self.push_screen(self.main_screen)
@@ -699,5 +941,5 @@ class iSponsorBlockTVSetup(App):
def main(config): def main(config):
app = iSponsorBlockTVSetup(config) app = ISponsorBlockTVSetup(config)
app.run() app.run()

View File

@@ -1,19 +1,26 @@
import asyncio import asyncio
import json import json
import aiohttp
import pyytlounge
from .constants import youtube_client_blacklist
# Temporary imports import pyytlounge
from pyytlounge.api import api_base from aiohttp import ClientSession
from pyytlounge.wrapper import NotLinkedException, desync
from .constants import youtube_client_blacklist
create_task = asyncio.create_task create_task = asyncio.create_task
class YtLoungeApi(pyytlounge.YtLoungeApi): class YtLoungeApi(pyytlounge.YtLoungeApi):
def __init__(self, screen_id, config=None, api_helper=None): def __init__(
super().__init__("iSponsorBlockTV") self,
screen_id,
config=None,
api_helper=None,
logger=None,
web_session: ClientSession = None,
):
super().__init__("iSponsorBlockTV", logger=logger)
if web_session is not None:
self.session = web_session # And use the one we passed
self.auth.screen_id = screen_id self.auth.screen_id = screen_id
self.auth.lounge_id_token = None self.auth.lounge_id_token = None
self.api_helper = api_helper self.api_helper = api_helper
@@ -21,13 +28,17 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
self.subscribe_task = None self.subscribe_task = None
self.subscribe_task_watchdog = None self.subscribe_task_watchdog = None
self.callback = None self.callback = None
self.logger = logger
self.shorts_disconnected = False
if config: if config:
self.mute_ads = config.mute_ads self.mute_ads = config.mute_ads
self.skip_ads = config.skip_ads self.skip_ads = config.skip_ads
# Ensures that we still are subscribed to the lounge # Ensures that we still are subscribed to the lounge
async def _watchdog(self): async def _watchdog(self):
await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other) await asyncio.sleep(
35
) # YouTube sends at least a message every 30 seconds (no-op or any other)
try: try:
self.subscribe_task.cancel() self.subscribe_task.cancel()
except Exception: except Exception:
@@ -46,7 +57,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
# Process a lounge subscription event # Process a lounge subscription event
def _process_event(self, event_id: int, event_type: str, args): def _process_event(self, event_id: int, event_type: str, args):
# print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})") self.logger.debug(f"process_event({event_id}, {event_type}, {args})")
# (Re)start the watchdog # (Re)start the watchdog
try: try:
self.subscribe_task_watchdog.cancel() self.subscribe_task_watchdog.cancel()
@@ -64,43 +75,55 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif event_type == "nowPlaying": elif event_type == "nowPlaying":
data = args[0] data = args[0]
self.state = pyytlounge.PlaybackState(self._logger, data)
self._update_state()
# Unmute when the video starts playing # Unmute when the video starts playing
if self.mute_ads and data.get("state", "0") == "1": if self.mute_ads and data.get("state", "0") == "1":
# print("Ad has ended, unmuting") self.logger.info("Ad has ended, unmuting")
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif event_type == "onAdStateChange": elif event_type == "onAdStateChange":
data = args[0] data = args[0]
if data["adState"] == '0': # Ad is not playing if data["adState"] == "0": # Ad is not playing
# print("Ad has ended, unmuting") self.logger.info("Ad has ended, unmuting")
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif self.skip_ads and data["isSkipEnabled"] == "true": # YouTube uses strings for booleans elif (
print("Ad can be skipped, skipping") self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans
self.logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad()) create_task(self.skip_ad())
create_task(self.mute(False, override=True)) create_task(self.mute(False, override=True))
elif self.mute_ads: # Seen multiple other adStates, assuming they are all ads elif (
print("Ad has started, muting") self.mute_ads
): # Seen multiple other adStates, assuming they are all ads
self.logger.info("Ad has started, muting")
create_task(self.mute(True, override=True)) create_task(self.mute(True, override=True))
# Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it) # Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it)
elif event_type == "onVolumeChanged": elif event_type == "onVolumeChanged":
self.volume_state = args[0] self.volume_state = args[0]
pass pass
# Gets segments for the next video before it starts playing # Gets segments for the next video before it starts playing
# Comment "fix" since it doesn't seem to work elif event_type == "autoplayUpNext":
# elif event_type == "autoplayUpNext": if len(args) > 0 and (
# if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty vid_id := args[0]["videoId"]
# print(f"Getting segments for next video: {vid_id}") ): # if video id is not empty
# create_task(self.api_helper.get_segments(vid_id)) self.logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id))
# #Used to know if an ad is skippable or not # #Used to know if an ad is skippable or not
elif event_type == "adPlaying": elif event_type == "adPlaying":
data = args[0] data = args[0]
# Gets segments for the next video (after the ad) before it starts playing # Gets segments for the next video (after the ad) before it starts playing
if vid_id := data["contentVideoId"]: if vid_id := data["contentVideoId"]:
print(f"Getting segments for next video: {vid_id}") self.logger.info(f"Getting segments for next video: {vid_id}")
create_task(self.api_helper.get_segments(vid_id)) create_task(self.api_helper.get_segments(vid_id))
if self.mute_ads: elif (
self.skip_ads and data["isSkipEnabled"] == "true"
): # YouTube uses strings for booleans
self.logger.info("Ad can be skipped, skipping")
create_task(self.skip_ad())
create_task(self.mute(False, override=True))
elif (
self.mute_ads
): # Seen multiple other adStates, assuming they are all ads
self.logger.info("Ad has started, muting")
create_task(self.mute(True, override=True)) create_task(self.mute(True, override=True))
elif event_type == "loungeStatus": elif event_type == "loungeStatus":
@@ -112,9 +135,18 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if device_info.get("clientName", "") in youtube_client_blacklist: if device_info.get("clientName", "") in youtube_client_blacklist:
self._sid = None self._sid = None
self._gsession = None # Force disconnect self._gsession = None # Force disconnect
# elif event_type == "onAutoplayModeChanged":
# data = args[0] elif event_type == "onSubtitlesTrackChanged":
# create_task(self.set_auto_play_mode(data["autoplayMode"] == "ENABLED")) if self.shorts_disconnected:
data = args[0]
video_id_saved = data.get("videoId", None)
self.shorts_disconnected = False
create_task(self.play_video(video_id_saved))
elif event_type == "loungeScreenDisconnected":
data = args[0]
if data["reason"] == "disconnectedByUserScreenInitiated": # Short playing?
self.shorts_disconnected = True
super()._process_event(event_id, event_type, args) super()._process_event(event_id, event_type, args)
# Set the volume to a specific value (0-100) # Set the volume to a specific value (0-100)
@@ -133,52 +165,15 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
if override or not (self.volume_state.get("muted", "false") == mute_str): if override or not (self.volume_state.get("muted", "false") == mute_str):
self.volume_state["muted"] = mute_str self.volume_state["muted"] = mute_str
# YouTube wants the volume when unmuting, so we send it # YouTube wants the volume when unmuting, so we send it
await super()._command("setVolume", {"volume": self.volume_state.get("volume", 100), "muted": mute_str}) await super()._command(
"setVolume",
{"volume": self.volume_state.get("volume", 100), "muted": mute_str},
)
async def set_auto_play_mode(self, enabled: bool): async def set_auto_play_mode(self, enabled: bool):
await super()._command("setAutoplayMode", {"autoplayMode": "ENABLED" if enabled else "DISABLED"}) await super()._command(
"setAutoplayMode", {"autoplayMode": "ENABLED" if enabled else "DISABLED"}
# Here just temporarily, will be removed once the PR is merged on YTlounge
async def connect(self) -> bool:
"""Attempt to connect using the previously set tokens"""
if not self.linked():
raise NotLinkedException("Not linked")
connect_body = {
"app": "web",
"mdx-version": "3",
"name": self.device_name,
"id": self.auth.screen_id,
"device": "REMOTE_CONTROL",
"capabilities": "que,dsdtr,atp",
"method": "setPlaylist",
"magnaKey": "cloudPairedDevice",
"ui": "false",
"deviceContext": "user_agent=dunno&window_width_points=&window_height_points=&os_name=android&ms=",
"theme": "cl",
"loungeIdToken": self.auth.lounge_id_token,
}
connect_url = (
f"{api_base}/bc/bind?RID=1&VER=8&CVER=1&auth_failure_option=send_error"
) )
async with aiohttp.ClientSession() as session:
async with session.post(url=connect_url, data=connect_body) as resp:
try:
text = await resp.text()
if resp.status == 401:
self.auth.lounge_id_token = None
return False
if resp.status != 200: async def play_video(self, video_id: str) -> bool:
self._logger.warning( return await self._command("setPlaylist", {"videoId": video_id})
"Unknown reply to connect %i %s", resp.status, resp.reason
)
return False
lines = text.splitlines()
async for events in self._parse_event_chunks(desync(lines)):
self._process_events(events)
self._command_offset = 1
return self.connected()
except Exception as ex:
self._logger.exception(ex, resp.status, resp.reason)
return False