first v2 commit

This commit is contained in:
dmunozv04
2023-09-04 14:11:34 +02:00
parent 1aa06e677f
commit 359a7f7be1
18 changed files with 1899 additions and 392 deletions

View File

@@ -1,17 +1,15 @@
# This is a basic workflow to help you get started with Actions
name: build docker images name: build docker images
# Controls when the workflow will run # Controls when the workflow will run
on: on:
push: push:
branches: branches:
- 'main' - '*'
tags: tags:
- 'v*' - 'v*'
pull_request: pull_request:
branches: branches:
- 'main' - '*'
workflow_dispatch: workflow_dispatch:
@@ -19,52 +17,53 @@ permissions:
contents: read contents: read
packages: write packages: write
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs: jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Get the repository's code # Get the repository's code
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
# Generate docker tags # Generate docker tags
- name: Docker meta - name: Docker meta
id: meta id: meta
uses: docker/metadata-action@v3 uses: docker/metadata-action@v4
with: with:
images: ghcr.io/dmunozv04/isponsorblocktv, dmunozv04/isponsorblocktv images: ghcr.io/dmunozv04/isponsorblocktv, dmunozv04/isponsorblocktv
tags: | # tags: |
type=raw,value=latest,priority=900,enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }} # type=raw,value=latest,priority=900,enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }}
type=ref,enable=true,priority=600,prefix=pr-,suffix=,event=pr # type=ref,enable=true,priority=600,prefix=pr-,suffix=,event=pr
# https://github.com/docker/setup-qemu-action # https://github.com/docker/setup-qemu-action
- name: Set up QEMU - name: Set up QEMU
uses: docker/setup-qemu-action@v1 uses: docker/setup-qemu-action@v2
# https://github.com/docker/setup-buildx-action # https://github.com/docker/setup-buildx-action
- name: Set up Docker Buildx - name: Set up Docker Buildx
id: buildx id: buildx
uses: docker/setup-buildx-action@v1 uses: docker/setup-buildx-action@v2
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v1 if: github.event_name != 'pull_request'
uses: docker/login-action@v2
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }} password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to GHCR - name: Login to GHCR
uses: docker/login-action@v1 if: github.event_name != 'pull_request'
uses: docker/login-action@v2
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v2 uses: docker/build-push-action@v4
with: with:
context: . context: .
platforms: linux/amd64, linux/arm64 platforms: linux/amd64, linux/arm64, linux/arm/v7
push: true push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache cache-from: type=registry,ref=ghcr.io/dmunozv04/isponsorblocktv:buildcache

4
.gitignore vendored
View File

@@ -154,7 +154,9 @@ cython_debug/
#.idea/ #.idea/
#config folder #config folder
config/ data/
config.json config.json
.DS_Store
.DS_Store .DS_Store

View File

@@ -1,19 +1,13 @@
# syntax=docker/dockerfile:1 # syntax=docker/dockerfile:1
FROM python:alpine FROM python:alpine3.11
RUN python -m venv /opt/venv ENV PIP_NO_CACHE_DIR=off iSPBTV_docker=True TERM=xterm-256color COLORTERM=truecolor
ENV PATH="/opt/venv/bin:$PATH" PIP_NO_CACHE_DIR=off iSPBTV_docker=True
COPY requirements.txt . COPY requirements.txt .
RUN apk add gcc musl-dev build-base linux-headers libffi-dev rust cargo openssl-dev git avahi && \ RUN pip install --upgrade pip wheel && \
pip install --upgrade pip setuptools-rust wheel && \ pip install -r requirements.txt
pip install -r requirements.txt && \
apk del gcc musl-dev build-base linux-headers libffi-dev rust cargo openssl-dev git && \
rm -rf /root/.cache /root/.cargo
COPY requirements.txt . COPY requirements.txt .
@@ -21,4 +15,4 @@ WORKDIR /app
COPY . . COPY . .
ENTRYPOINT ["/opt/venv/bin/python3", "-u", "main.py"] ENTRYPOINT ["python3", "-u", "main.py"]

View File

@@ -1,8 +1,21 @@
{ {
"atvs": [ "devices": [
{"identifier": "", "airplay_credentials": ""} {
"screen_id": "",
"name": "YouTube on TV",
"offset": 0
}
], ],
"apikey":"", "skip_categories": [
"skip_categories": ["sponsor"], "sponsor"
"channel_whitelist": [] ],
"skip_count_tracking": true,
"mute_ads": true,
"skip_ads": true,
"apikey": "",
"channel_whitelist": [
{"id": "",
"name": ""
}
]
} }

8
docker-compose.yml Normal file
View File

@@ -0,0 +1,8 @@
version: '3.3'
services:
iSponsorBlockTV:
image: ghcr.io/dmunozv04/isponsorblocktv
container_name: iSponsorBlockTV
restart: unless-stopped
volumes:
- /PATH_TO_YOUR_DATA_DIR:/app/data

View File

@@ -1,7 +1,9 @@
from cache import AsyncTTL, AsyncLRU from cache import AsyncTTL, AsyncLRU
from . import constants from .conditional_ttl_cache import AsyncConditionalTTL
from . import constants, dial_client
from hashlib import sha256 from hashlib import sha256
from asyncio import create_task from asyncio import create_task
from aiohttp import ClientSession
import html import html
@@ -15,107 +17,144 @@ def listToTuple(function):
return wrapper return wrapper
@AsyncLRU(maxsize=10) # Class that handles all the api calls and their cache
async def get_vid_id(title, artist, api_key, web_session): class ApiHelper:
params = {"q": title + " " + artist, "key": api_key, "part": "snippet"} def __init__(self, config, web_session: ClientSession) -> None:
url = constants.Youtube_api + "search" self.apikey = config.apikey
async with web_session.get(url, params=params) as resp: self.skip_categories = config.skip_categories
data = await resp.json() self.channel_whitelist = config.channel_whitelist
self.skip_count_tracking = config.skip_count_tracking
if "error" in data: self.web_session = web_session
self.num_devices = len(config.devices)
# Not used anymore, maybe it can stay here a little longer
@AsyncLRU(maxsize=10)
async def get_vid_id(self, title, artist, api_key, web_session):
params = {"q": title + " " + artist, "key": api_key, "part": "snippet"}
url = constants.Youtube_api + "search"
async with web_session.get(url, params=params) as resp:
data = await resp.json()
if "error" in data:
return
for i in data["items"]:
if (i["id"]["kind"] != "youtube#video"):
continue
title_api = html.unescape(i["snippet"]["title"])
artist_api = html.unescape(i["snippet"]["channelTitle"])
if title_api == title and artist_api == artist:
return (i["id"]["videoId"], i["snippet"]["channelId"])
return return
for i in data["items"]: @AsyncLRU(maxsize=100)
if (i["id"]["kind"] != "youtube#video"): async def is_whitelisted(self, vid_id):
continue if (self.apikey and self.channel_whitelist):
title_api = html.unescape(i["snippet"]["title"]) channel_id = await self.__get_channel_id(vid_id)
artist_api = html.unescape(i["snippet"]["channelTitle"]) # check if channel id is in whitelist
if title_api == title and artist_api == artist: for i in self.channel_whitelist:
return (i["id"]["videoId"], i["snippet"]["channelId"]) if i["id"] == channel_id:
return return True
return False
@AsyncLRU(maxsize=10) async def __get_channel_id(self, vid_id):
async def search_channels(channel, api_key, web_session): params = {"id": vid_id, "key": self.apikey, "part": "snippet"}
channels = [] url = constants.Youtube_api + "videos"
params = {"q": channel, "key": api_key, "part": "snippet", "type": "channel", "maxResults": "5"} async with self.web_session.get(url, params=params) as resp:
url = constants.Youtube_api + "search" data = await resp.json()
async with web_session.get(url, params=params) as resp:
data = await resp.json()
if "error" in data: if "error" in data:
return
data = data["items"][0]
if (data["kind"] != "youtube#video"):
return
return data["snippet"]["channelId"]
@AsyncLRU(maxsize=10)
async def search_channels(self, channel):
channels = []
params = {"q": channel, "key": self.apikey, "part": "snippet", "type": "channel", "maxResults": "5"}
url = constants.Youtube_api + "search"
async with self.web_session.get(url, params=params) as resp:
data = await resp.json()
if "error" in data:
return channels
for i in data["items"]:
# Get channel subcription number
params = {"id": i["snippet"]["channelId"], "key": self.apikey, "part": "statistics"}
url = constants.Youtube_api + "channels"
async with self.web_session.get(url, params=params) as resp:
channelData = await resp.json()
if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]:
subCount = "Hidden"
else:
subCount = int(channelData["items"][0]["statistics"]["subscriberCount"])
subCount = format(subCount, "_")
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount))
return channels return channels
for i in data["items"]: @listToTuple # Convert list to tuple so it can be used as a key in the cache
# Get channel subcription number @AsyncConditionalTTL(time_to_live=300, maxsize=10) # 5 minutes for non-locked segments
params = {"id": i["snippet"]["channelId"], "key": api_key, "part": "statistics"} async def get_segments(self, vid_id):
url = constants.Youtube_api + "channels" if await self.is_whitelisted(vid_id):
async with web_session.get(url, params=params) as resp: print("Video is whitelisted")
channelData = await resp.json() return ([], True) # Return empty list and True to indicate that the cache should last forever
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4
] # Hashes video id and gets the first 4 characters
params = {
"category": self.skip_categories,
"actionType": constants.SponsorBlock_actiontype,
"service": constants.SponsorBlock_service,
}
headers = {"Accept": "application/json"}
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with self.web_session.get(url, headers=headers, params=params) as response:
response = await response.json()
for i in response:
if str(i["videoID"]) == str(vid_id):
response = i
break
segments = []
ignore_ttl = True
try:
for i in response["segments"]:
ignore_ttl = ignore_ttl and i["locked"] == 1 # If all segments are locked, ignore ttl
segment = i["segment"]
UUID = i["UUID"]
segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]}
try:
# Get segment before to check if they are too close to each other
segment_before_end = segments[-1]["end"]
segment_before_start = segments[-1]["start"]
segment_before_UUID = segments[-1]["UUID"]
if channelData["items"][0]["statistics"]["hiddenSubscriberCount"]: except Exception:
subCount = "Hidden" segment_before_end = -10
else: if (
subCount = channelData["items"][0]["statistics"]["subscriberCount"] segment_dict["start"] - segment_before_end < 1
): # Less than 1 second appart, combine them and skip them together
segment_dict["start"] = segment_before_start
segment_dict["UUID"].append(segment_before_UUID)
segments.pop()
segments.append(segment_dict)
except Exception:
pass
return (segments, ignore_ttl)
channels.append((i["snippet"]["channelId"], i["snippet"]["channelTitle"], subCount)) async def mark_viewed_segments(self, UUID):
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking:
for i in UUID:
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": i}
await self.web_session.post(url, params=params)
return channels async def discover_youtube_devices_dial(self):
"""Discovers YouTube devices using DIAL"""
@listToTuple dial_screens = await dial_client.discover(self.web_session)
@AsyncTTL(time_to_live=300, maxsize=5) # print(dial_screens)
async def get_segments(vid_id, web_session, categories=["sponsor"]): return dial_screens
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4
] # Hashes video id and get the first 4 characters
params = {
"category": categories,
"actionType": constants.SponsorBlock_actiontype,
"service": constants.SponsorBlock_service,
}
headers = {"Accept": "application/json"}
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with web_session.get(url, headers=headers, params=params) as response:
response = await response.json()
for i in response:
if str(i["videoID"]) == str(vid_id):
response = i
break
segments = []
try:
for i in response["segments"]:
segment = i["segment"]
UUID = i["UUID"]
segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]}
try:
# Get segment before to check if they are too close to each other
segment_before_end = segments[-1]["end"]
segment_before_start = segments[-1]["start"]
segment_before_UUID = segments[-1]["UUID"]
except:
segment_before_end = -10
if (
segment_dict["start"] - segment_before_end < 1
): # Less than 1 second appart, combine them and skip them together
segment_dict["start"] = segment_before_start
segment_dict["UUID"].append(segment_before_UUID)
segments.pop()
segments.append(segment_dict)
except:
pass
return segments
async def viewed_segments(UUID, web_session):
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
for i in UUID:
create_task(mark_viewed_segment(i, web_session))
return
async def mark_viewed_segment(UUID, web_session):
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": UUID}
async with web_session.post(url, params=params) as response:
response_text = await response.text()
return

View File

@@ -0,0 +1,87 @@
"""MIT License
Copyright (c) 2020 Rajat Singh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE."""
'''Modified code from https://github.com/iamsinghrajat/async-cache'''
from cache.key import KEY
from cache.lru import LRU
import datetime
class AsyncConditionalTTL:
class _TTL(LRU):
def __init__(self, time_to_live, maxsize):
super().__init__(maxsize=maxsize)
self.time_to_live = datetime.timedelta(
seconds=time_to_live
) if time_to_live else None
self.maxsize = maxsize
def __contains__(self, key):
if key not in self.keys():
return False
else:
key_expiration = super().__getitem__(key)[1]
if key_expiration and key_expiration < datetime.datetime.now():
del self[key]
return False
else:
return True
def __getitem__(self, key):
value = super().__getitem__(key)[0]
return value
def __setitem__(self, key, value):
value, ignore_ttl = value # unpack tuple
ttl_value = (
datetime.datetime.now() + self.time_to_live
) if (self.time_to_live and not ignore_ttl) else None # ignore ttl if ignore_ttl is True
super().__setitem__(key, (value, ttl_value))
def __init__(
self, time_to_live=60, maxsize=1024, skip_args: int = 0
):
"""
:param time_to_live: Use time_to_live as None for non expiring cache
:param maxsize: Use maxsize as None for unlimited size cache
:param skip_args: Use `1` to skip first arg of func in determining cache key
"""
self.ttl = self._TTL(time_to_live=time_to_live, maxsize=maxsize)
self.skip_args = skip_args
def __call__(self, func):
async def wrapper(*args, **kwargs):
key = KEY(args[self.skip_args:], kwargs)
if key in self.ttl:
val = self.ttl[key]
else:
self.ttl[key] = await func(*args, **kwargs)
val = self.ttl[key]
return val
wrapper.__name__ += func.__name__
return wrapper

View File

@@ -1,107 +1,67 @@
import pyatv
import json import json
import asyncio import asyncio
from pyatv.const import OperatingSystem
import sys import sys
import aiohttp import aiohttp
import asyncio from . import api_helpers, ytlounge
from . import api_helpers
def save_config(config, config_file): async def pair_device(loop):
with open(config_file, "w") as f: try:
json.dump(config, f) lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ")
pairing_code = int(pairing_code.replace("-", "").replace(" ","")) # remove dashes and spaces
# Taken from postlund/pyatv atvremote.py print("Pairing...")
async def _read_input(loop: asyncio.AbstractEventLoop, prompt: str): paired = await lounge_controller.pair(pairing_code)
sys.stdout.write(prompt) if not paired:
sys.stdout.flush() print("Failed to pair device")
user_input = await loop.run_in_executor(None, sys.stdin.readline) return
return user_input.strip() device = {
"screen_id": lounge_controller.auth.screen_id,
"name": lounge_controller.screen_name,
async def find_atvs(loop): }
devices = await pyatv.scan(loop) print(f"Paired device: {device['name']}")
if not devices: return device
print("No devices found") except Exception as e:
print(f"Failed to pair device: {e}")
return return
atvs = []
for i in devices: def main(config, debug: bool) -> None:
# Only get Apple TV's print("Welcome to the iSponsorBlockTV cli setup wizard")
if (
i.device_info.operating_system == OperatingSystem.TvOS
and input(f"Found {i.name}. Do you want to add it? (y/n) ") == "y"
):
identifier = i.identifier
pairing = await pyatv.pair(
i, loop=loop, protocol=pyatv.Protocol.AirPlay
)
await pairing.begin()
if pairing.device_provides_pin:
pin = await _read_input(loop, "Enter PIN on screen: ")
pairing.pin(pin)
await pairing.finish()
if pairing.has_paired:
creds = pairing.service.credentials
atvs.append(
{"identifier": identifier, "airplay_credentials": creds}
)
print("Pairing successful")
await pairing.close()
return atvs
def main(config, config_file, debug):
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
try: if debug:
num_atvs = len(config["atvs"]) loop.set_debug(True)
except: asyncio.set_event_loop(loop)
num_atvs = 0 if hasattr(config, "atvs"):
if ( print("The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2")
input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs)) if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y":
== "y" del config["atvs"]
): devices = config.devices
loop = asyncio.get_event_loop_policy().get_event_loop() while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n":
if debug: task = loop.create_task(pair_device(loop))
loop.set_debug(True)
asyncio.set_event_loop(loop)
task = loop.create_task(find_atvs(loop))
loop.run_until_complete(task) loop.run_until_complete(task)
atvs = task.result() device = task.result()
try: if device:
for i in atvs: devices.append(device)
config["atvs"].append(i) config.devices = devices
print("Done adding")
except:
print("Rewriting atvs (don't worry if none were saved before)")
config["atvs"] = atvs
try: apikey = config.apikey
apikey = config["apikey"] if apikey:
except:
apikey = ""
if apikey != "":
if input("API key already specified. Change it? (y/n) ") == "y": if input("API key already specified. Change it? (y/n) ") == "y":
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
config["apikey"] = apikey config["apikey"] = apikey
else: else:
print( if input("API key only needed for the channel whitelist function. Add it? (y/n) ") == "y":
"Get youtube apikey here: https://developers.google.com/youtube/registering_an_application" print(
) "Get youtube apikey here: https://developers.google.com/youtube/registering_an_application"
apikey = input("Enter your API key: ") )
config["apikey"] = apikey apikey = input("Enter your API key: ")
config["apikey"] = apikey
try: config.apikey = apikey
skip_categories = config["skip_categories"]
except: skip_categories = config.skip_categories
skip_categories = [] if skip_categories:
if skip_categories != []:
if input("Skip categories already specified. Change them? (y/n) ") == "y": if input("Skip categories already specified. Change them? (y/n) ") == "y":
categories = input( categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic:\n" "Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic]:\n"
) )
skip_categories = categories.replace(",", " ").split(" ") skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
@@ -111,14 +71,12 @@ def main(config, config_file, debug):
) )
skip_categories = categories.replace(",", " ").split(" ") skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
config["skip_categories"] = skip_categories config.skip_categories = skip_categories
try:
channel_whitelist = config["channel_whitelist"]
except:
channel_whitelist = []
channel_whitelist = config.channel_whitelist
if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y": if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y":
if(not apikey):
print("WARNING: You need to specify an API key to use this function, otherwise the program will fail to start.\nYou can add one by re-running this setup wizard.")
web_session = aiohttp.ClientSession() web_session = aiohttp.ClientSession()
while True: while True:
channel_info = {} channel_info = {}
@@ -158,7 +116,8 @@ def main(config, config_file, debug):
# Close web session asynchronously # Close web session asynchronously
loop.run_until_complete(web_session.close()) loop.run_until_complete(web_session.close())
config["channel_whitelist"] = channel_whitelist config.channel_whitelist = channel_whitelist
config.skip_count_tracking = not input("Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n"
print("Config finished") print("Config finished")
save_config(config, config_file) config.save()

View File

@@ -4,3 +4,16 @@ SponsorBlock_actiontype = "skip"
SponsorBlock_api = "https://sponsor.ajay.app/api/" SponsorBlock_api = "https://sponsor.ajay.app/api/"
Youtube_api = "https://www.googleapis.com/youtube/v3/" Youtube_api = "https://www.googleapis.com/youtube/v3/"
skip_categories = (
('Sponsor', 'sponsor'),
('Self Promotion', 'selfpromo'),
('Intro', 'intro'),
('Outro', 'outro'),
('Music Offtopic', 'music_offtopic'),
('Interaction', 'interaction'),
('Exclusive Access', 'exclusive_access'),
('POI Highlight', 'poi_highlight'),
('Preview', 'preview'),
('Filler', 'filler'),
)

View File

@@ -0,0 +1,142 @@
"""Send out a M-SEARCH request and listening for responses."""
import asyncio
import socket
import aiohttp
import ssdp
from ssdp import network
import xmltodict
'''
Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), with or without modification,
are permitted provided that the following conditions are met:
● Redistributions of the DIAL Specification must retain the above copyright notice, this list of conditions and the following
disclaimer.
● Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright notice, this
list of conditions and the following disclaimer.
● Redistributions of implementations of the DIAL Specification in binary form must include the above copyright notice.
● The DIAL mark, the NETFLIX mark and the names of contributors to the DIAL Specification may not be used to endorse or
promote specifications, software, products, or any other materials derived from the DIAL Specification without specific prior
written permission. The DIAL mark is owned by Netflix and information on licensing the DIAL mark is available at
www.dial-multiscreen.org.'''
'''
MIT License
Copyright (c) 2018 Johannes Hoppe
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.'''
'''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py'''
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
class Handler(ssdp.aio.SSDP):
def __init__(self):
super().__init__()
self.devices = []
def clear(self):
self.devices = []
def __call__(self):
return self
def response_received(self, response: ssdp.messages.SSDPResponse, addr):
headers = response.headers
headers = {k.lower(): v for k, v in headers}
# print(headers)
if "location" in headers:
self.devices.append(headers["location"])
async def find_youtube_app(web_session, url_location):
async with web_session.get(url_location) as response:
headers = response.headers
response = await response.text()
# print(headers)
data = xmltodict.parse(response)
name = data["root"]["device"]["friendlyName"]
handler = Handler()
handler.clear()
app_url = headers["application-url"]
youtube_url = app_url + "YouTube"
# print(youtube_url)
async with web_session.get(youtube_url) as response:
status_code = response.status
response = await response.text()
# print(status_code)
if status_code == 200:
data = xmltodict.parse(response)
data = data["service"]
screen_id = data["additionalData"]["screenId"]
return {"screen_id": screen_id, "name": name, "offset": 0}
async def discover(web_session):
bind = None
search_target = "urn:dial-multiscreen-org:service:dial:1"
max_wait = 10
handler = Handler()
"""Send out an M-SEARCH request and listening for responses."""
family, addr = network.get_best_family(bind, network.PORT)
loop = asyncio.get_event_loop()
ip_address = get_ip()
connect = loop.create_datagram_endpoint(handler, family=family, local_addr=(ip_address, None))
transport, protocol = await connect
target = network.MULTICAST_ADDRESS_IPV4, network.PORT
search_request = ssdp.messages.SSDPRequest(
"M-SEARCH",
headers={
"HOST": "%s:%d" % target,
"MAN": '"ssdp:discover"',
"MX": str(max_wait), # seconds to delay response [1..5]
"ST": search_target,
},
)
target = network.MULTICAST_ADDRESS_IPV4, network.PORT
search_request.sendto(transport, target)
# print(search_request, addr[:2])
try:
await asyncio.sleep(4)
finally:
transport.close()
devices = []
for i in handler.devices:
devices.append(await find_youtube_app(web_session, i))
return devices

View File

@@ -1,55 +1,118 @@
import argparse import argparse
from . import config_setup
from . import main
from . import macos_install
import json import json
import os
import logging import logging
import os
import sys import sys
import time
def load_config(config_file): from . import config_setup, main, setup_wizard
if os.path.exists(config_file):
try:
with open(config_file, "r") as f: class Device:
config = json.load(f) def __init__(self, args_dict):
except: self.screen_id = ""
print("Creating config file") self.offset = 0
config = {} self.__load(args_dict)
else: self.__validate()
if os.getenv("iSPBTV_docker"):
def __load(self, args_dict):
for i in args_dict:
setattr(self, i, args_dict[i])
# Change offset to seconds (from milliseconds)
self.offset = self.offset / 1000
def __validate(self):
if not self.screen_id:
raise ValueError("No screen id found")
class Config:
def __init__(self, data_dir):
self.data_dir = data_dir
self.config_file = data_dir + "/config.json"
self.devices = []
self.apikey = ""
self.skip_categories = []
self.channel_whitelist = []
self.skip_count_tracking = True
self.mute_ads = False
self.skip_ads = False
self.__load()
def validate(self):
if hasattr(self, "atvs"):
print( print(
"You are running in docker, you have to mount the config file.\nPlease check the README.md for more information." "The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2",
) )
print("Exiting in 10 seconds...")
time.sleep(10)
sys.exit() sys.exit()
return if not self.devices:
else: print("No devices found, please add at least one device")
print("Creating config file") print("Exiting in 10 seconds...")
config = {} # Create blank config to setup time.sleep(10)
return config sys.exit()
self.devices = [Device(i) for i in self.devices]
if not self.apikey and self.channel_whitelist:
raise ValueError("No youtube API key found and channel whitelist is not empty")
if not self.skip_categories:
self.categories = ["sponsor"]
print("No categories found, using default: sponsor")
def __load(self):
try:
with open(self.config_file, "r") as f:
config = json.load(f)
for i in config:
setattr(self, i, config[i])
except FileNotFoundError:
print("Could not load config file")
# Create data directory if it doesn't exist (if we're not running in docker)
if not os.path.exists(self.data_dir):
if not os.getenv("iSPBTV_docker"):
print("Creating data directory")
os.makedirs(self.data_dir)
else: # Running in docker without mounting the data dir
print("Running in docker without mounting the data dir, check the wiki for more information: "
"https://github.com/dmunozv04/iSponsorBlockTV/wiki/Installation#Docker")
print("Exiting in 10 seconds...")
time.sleep(10)
sys.exit()
else:
print("Blank config file created")
def save(self):
with open(self.config_file, "w") as f:
config_dict = self.__dict__
# Don't save the config file name
config_file = self.config_file
del config_dict["config_file"]
json.dump(config_dict, f, indent=4)
self.config_file = config_file
def __eq__(self, other):
if isinstance(other, Config):
return self.__dict__ == other.__dict__
return False
def app_start(): def app_start():
parser = argparse.ArgumentParser(description="iSponsorblockTV") parser = argparse.ArgumentParser(description="iSponsorblockTV")
parser.add_argument("--file", "-f", default="config.json", help="config file") parser.add_argument("--data-dir", "-d", default="data", help="data directory")
parser.add_argument("--setup", "-s", action="store_true", help="setup the program") parser.add_argument("--setup", "-s", action="store_true", help="setup the program graphically")
parser.add_argument("--debug", "-d", action="store_true", help="debug mode") parser.add_argument("--setup-cli", "-sc", action="store_true", help="setup the program in the command line")
parser.add_argument("--macos_install", action="store_true", help="install in macOS") parser.add_argument("--debug", action="store_true", help="debug mode")
args = parser.parse_args() args = parser.parse_args()
config = load_config(args.file) config = Config(args.data_dir)
if args.debug: if args.debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
if args.setup: # Setup the config file if args.setup: # Set up the config file graphically
config_setup.main(config, args.file, args.debug) setup_wizard.main(config)
elif args.macos_install: sys.exit()
macos_install.main() if args.setup_cli: # Set up the config file
config_setup.main(config, args.debug)
else: else:
try: # Check if config file has the correct structure config.validate()
config["atvs"], config["apikey"], config["skip_categories"], config["channel_whitelist"] main.main(config, args.debug)
except: # If not, ask to setup the program
print("invalid config file, please run with --setup")
sys.exit()
main.main(
config["atvs"], config["apikey"], config["skip_categories"], config["channel_whitelist"], args.debug
)

View File

@@ -1,7 +1,7 @@
import plistlib import plistlib
import os import os
from . import config_setup from . import config_setup
"""Not updated to V2 yet, should still work. Here be dragons"""
default_plist = { default_plist = {
"Label": "com.dmunozv04iSponsorBlockTV", "Label": "com.dmunozv04iSponsorBlockTV",
"RunAtLoad": True, "RunAtLoad": True,

View File

@@ -1,148 +1,147 @@
import asyncio import asyncio
import pyatv
import aiohttp import aiohttp
import time import time
import logging import logging
from . import api_helpers from . import api_helpers, ytlounge
import traceback
class MyPushListener(pyatv.interface.PushListener): class DeviceListener:
task = None def __init__(self, api_helper, config, screen_id, offset):
apikey = None self.api_helper = api_helper
rc = None self.lounge_controller = ytlounge.YtLoungeApi(screen_id, config, api_helper)
self.offset = offset
self.cancelled = False
web_session = None # Ensures that we have a valid auth token
categories = ["sponsor"] async def refresh_auth_loop(self):
whitelist = [] while True:
await asyncio.sleep(60 * 60 * 24) # Refresh every 24 hours
try:
await self.lounge_controller.refresh_auth()
except:
# traceback.print_exc()
pass
def __init__(self, apikey, atv, categories, whitelist, web_session): async def is_available(self):
self.apikey = apikey try:
self.rc = atv.remote_control return await self.lounge_controller.is_available()
self.web_session = web_session except:
self.categories = categories # traceback.print_exc()
self.whitelist = whitelist return False
self.atv = atv
def playstatus_update(self, updater, playstatus): # Main subscription loop
logging.debug("Playstatus update" + str(playstatus)) async def loop(self):
lounge_controller = self.lounge_controller
while not lounge_controller.linked():
try:
await lounge_controller.refresh_auth()
except:
# traceback.print_exc()
await asyncio.sleep(10)
while not self.cancelled:
while not (await self.is_available()) and not self.cancelled:
await asyncio.sleep(10)
try:
await lounge_controller.connect()
except:
pass
while not lounge_controller.connected() and not self.cancelled:
await asyncio.sleep(10)
try:
await lounge_controller.connect()
except:
pass
# print(f"Connected to device {lounge_controller.screen_name}")
try:
print("Subscribing to lounge")
sub = await lounge_controller.subscribe_monitored(self)
await sub
print("Subscription ended")
except:
pass
# Method called on playback state change
async def __call__(self, state):
logging.debug("Playstatus update" + str(state))
try: try:
self.task.cancel() self.task.cancel()
except: except:
pass pass
time_start = time.time() time_start = time.time()
self.task = asyncio.create_task( self.task = asyncio.create_task(self.process_playstatus(state, time_start))
process_playstatus(
playstatus,
self.apikey,
self.rc,
self.web_session,
self.categories,
self.atv,
time_start,
self.whitelist
)
)
def playstatus_error(self, updater, exception): # Processes the playback state change
logging.error(exception) async def process_playstatus(self, state, time_start):
print("stopped") segments = []
if state.videoId:
segments = await self.api_helper.get_segments(state.videoId)
async def process_playstatus(
playstatus, apikey, rc, web_session, categories, atv, time_start, whitelist
):
logging.debug("App playing is:" + str(atv.metadata.app.identifier))
if (
playstatus.device_state == playstatus.device_state.Playing
and atv.metadata.app.identifier == "com.google.ios.youtube"
):
vid_id = await api_helpers.get_vid_id(
playstatus.title, playstatus.artist, apikey, web_session
)
if vid_id:
print(f"ID: {vid_id[0]}, Channel ID: {vid_id[1]}")
for i in whitelist:
if vid_id[1] == i["id"]:
print("Channel whitelisted, skipping.")
return
segments = await api_helpers.get_segments(vid_id[0], web_session, categories)
print(segments) print(segments)
await time_to_segment( if state.state.value == 1 and segments: # Playing and has segments to skip
segments, playstatus.position, rc, time_start, web_session await self.time_to_segment(segments, state.currentTime, time_start)
)
else:
print("Could not find video id")
# Finds the next segment to skip to and skips to it
async def time_to_segment(self, segments, position, time_start):
start_next_segment = None
next_segment = None
for segment in segments:
if position < 2 and (
segment["start"] <= position < segment["end"]
):
next_segment = segment
start_next_segment = position # different variable so segment doesn't change
break
if segment["start"] > position:
next_segment = segment
start_next_segment = next_segment["start"]
break
if start_next_segment:
time_to_next = start_next_segment - position - (time.time() - time_start) - self.offset
await self.skip(time_to_next, next_segment["end"], next_segment["UUID"])
async def time_to_segment(segments, position, rc, time_start, web_session): # Skips to the next segment (waits for the time to pass)
position = position + (time.time() - time_start) async def skip(self, time_to, position, UUID):
for segment in segments: await asyncio.sleep(time_to)
if position < 2 and ( asyncio.create_task(self.lounge_controller.seek_to(position))
position >= segment["start"] and position < segment["end"] asyncio.create_task(
): self.api_helper.mark_viewed_segments(UUID)
next_segment = [position, segment["end"]] ) # Don't wait for this to finish
break
if segment["start"] > position:
next_segment = segment
break
time_to_next = next_segment["start"] - position
await skip(time_to_next, next_segment["end"], next_segment["UUID"], rc, web_session)
# Stops the connection to the device
async def skip(time_to, position, UUID, rc, web_session): async def cancel(self):
await asyncio.sleep(time_to) self.cancelled = True
await rc.set_position(position)
# await api_helpers.viewed_segments(UUID, web_session) DISABLED FOR NOW
async def connect_atv(loop, identifier, airplay_credentials):
"""Find a device and print what is playing."""
print("Discovering devices on network...")
atvs = await pyatv.scan(loop, identifier=identifier)
if not atvs:
print("No device found, will retry")
return
config = atvs[0]
config.set_credentials(pyatv.Protocol.AirPlay, airplay_credentials)
print(f"Connecting to {config.address}")
return await pyatv.connect(config, loop)
async def loop_atv(event_loop, atv_config, apikey, categories, whitelist, web_session):
identifier = atv_config["identifier"]
airplay_credentials = atv_config["airplay_credentials"]
atv = await connect_atv(event_loop, identifier, airplay_credentials)
if atv:
listener = MyPushListener(apikey, atv, categories, whitelist, web_session)
atv.push_updater.listener = listener
atv.push_updater.start()
print("Push updater started")
while True:
await asyncio.sleep(20)
try: try:
atv.metadata.app self.task.cancel()
except: except Exception as e:
print("Reconnecting to Apple TV") traceback.print_exc()
# reconnect to apple tv
atv = await connect_atv(event_loop, identifier, airplay_credentials)
if atv:
listener = MyPushListener(apikey, atv, categories, whitelist, web_session)
atv.push_updater.listener = listener
atv.push_updater.start()
print("Push updater started")
def main(atv_configs, apikey, categories, whitelist, debug): async def finish(devices):
for i in devices:
await i.cancel()
def main(config, debug):
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
tasks = [] # Save the tasks so the interpreter doesn't garbage collect them
devices = [] # Save the devices to close them later
if debug: if debug:
loop.set_debug(True) loop.set_debug(True)
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
web_session = aiohttp.ClientSession() tcp_connector = aiohttp.TCPConnector(ttl_dns_cache=300)
for i in atv_configs: web_session = aiohttp.ClientSession(loop=loop, connector=tcp_connector)
loop.create_task(loop_atv(loop, i, apikey, categories, whitelist, web_session)) api_helper = api_helpers.ApiHelper(config, web_session)
loop.run_forever() for i in config.devices:
device = DeviceListener(api_helper, config, i.screen_id, i.offset)
devices.append(device)
tasks.append(loop.create_task(device.loop()))
tasks.append(loop.create_task(device.refresh_auth_loop()))
try:
loop.run_forever()
except KeyboardInterrupt as e:
print("Keyboard interrupt detected, cancelling tasks and exiting...")
traceback.print_exc()
loop.run_until_complete(finish(devices))
finally:
loop.run_until_complete(web_session.close())

View File

@@ -0,0 +1,365 @@
.container {
background: $boost;
margin: 1;
padding: 1 1 0 1;
height: auto;
width: 100%;
}
.title {
text-style: bold underline;
dock: top;
background: $primary-background;
width: 100%;
}
.subtitle{
width: 100%;
}
#setup-wizard{
scrollbar-gutter: stable;
}
.small-button{
height: 3;
}
.button-100 {
width: 100%;
}
/* Exit screen */
ExitScreen {
align: center middle;
}
#dialog-exit {
grid-size: 3;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 1 2;
width: 35%;
min-width: 50;
max-width: 70;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question {
column-span: 3;
height: 1fr;
width: 1fr;
content-align: center middle;
}
/* Device editor */
EditDevice {
align: center middle;
}
#edit-device-container {
padding: 1 2 0 2;
background: $surface;
border: thick $background 80%;
height: 17;
width: 50%;
min-width: 40;
}
#device-id-container{
grid-size: 4;
grid-gutter: 1 2;
grid-rows: 1fr 3;
width: 100%;
margin-right: 1;
}
#device-id-input{
column-span: 3;
height: 1fr;
width: 1fr;
content-align: center middle;
}
#device-id-view{
width: 1fr;
}
#device-offset-container{
width: 100%;
}
#device-offset-input{
width: 12;
}
#device-offset-slider{
width: 100%;
margin-right: 12;
}
/* devices */
#devices-manager {
min-height: 4;
height: auto;
max-height: 70%;
width: 100%;
scrollbar-gutter: stable;
overflow-y: auto;
}
Element {
background: $panel;
border-top: solid $panel-lighten-2;
layout: horizontal;
height: 2;
width: 100%;
margin: 0 1 0 1;
padding: 0;
}
Element > .element-name {
offset: 0 -1;
padding: 0;
width: 100%;
align: left middle;
text-align: left;
}
Element > .element-remove {
dock: right;
padding: 0;
align: left middle;
text-align: center;
width: 8;
min-width: 8;
margin: 0 1 0 0;
}
#add-device {
text-style: bold;
width: 100%;
align: left middle;
text-align: center;
dock: left;
text-align: left;
}
#add-device-button-container{
height: 1;
width: 100%;
margin: 1 1 0 1;
}
/* Add devices */
#add-device-container{
border: thick $background 80%;
background: $surface;
padding: 1 2 0 2;
height: auto;
width: 50%;
min-width: 40;
}
#add-device-switch-buttons{
grid-size: 2 1;
height: 5;
width: 100%;
padding: 1 1;
}
#add-device-pin-container{
height: auto;
}
#add-device-dial-container{
height: auto;
}
#pairing-code-input.-valid {
border: tall $success 60%;
}
#pairing-code-input.-valid:focus {
border: tall $success;
}
#add-device-pin-add-button{
margin: 1 1 0 1;
width: 100%;
}
#add-device-dial-add-button{
margin: 1 1 0 1;
width: 100%;
}
#add-device-info{
height: auto;
min-height: 0;
width: 100%;
padding: 0 1;
}
/* ApiKey */
#api-key-grid{
grid-size: 4;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 1 1;
width: 100%;
height: 5;
}
#api-key-input{
column-span: 3;
height: 1fr;
width: 1fr;
content-align: center middle;
}
/* Skip Categories */
#skip-categories-manager{
min-height: 10;
height: auto;
max-height: 70%;
width: 100%;
scrollbar-gutter: stable;
}
#skip-categories-selection-list{
height: auto;
width: 100%;
background: $boost;
margin: 1;
}
/* Segment count tracking */
#skip-count-tracking-switch{
margin: 1;
}
/* Channel Whitelist */
#channel-whitelist-manager {
min-height: 5;
height: auto;
max-height: 70%;
width: 100%;
scrollbar-gutter: stable;
overflow-y: auto;
}
#add-channel {
text-style: bold;
width: 100%;
align: left middle;
text-align: center;
dock: left;
text-align: left;
}
#add-channel-button-container{
height: 1;
width: 100%;
margin: 1 1 0 1;
}
/* Add Channel */
AddChannel{
align: center middle;
overflow-y: auto;
background: $background 60%;
}
#add-channel-container{
padding: 1 2 0 2;
border: thick $background 80%;
background: $surface;
height: auto;
width: 50%;
min-width: 40;
}
#add-channel-switch-buttons{
grid-size: 2 1;
height: 5;
width: 100%;
padding: 1 1;
}
.button-switcher{
width: 100%;
text-align: center;
}
#add-channel-switcher{
height: auto;
width: auto;
}
#add-channel-switcher-container{
height: auto;
width: 100%;
}
#add-channel-search-container{
height: auto;
}
#add-channel-search-inputs{
height: 3;
width: 100%;
grid-size: 4 1;
margin: 0 1 0 0;
/* padding: 0 1; */
}
#channel-name-input-search{
width: 100%;
height: auto;
column-span: 3;
}
#search-channel-button{
width: 1fr;
height: auto;
column-span: 1;
margin: 0 0 0 0;
}
#channel-search-results{
height: auto;
width: 100%;
background: $boost;
margin: 1;
}
#add-channel-info{
height: auto;
width: 100%;
margin: 0 1;
}
#add-channel-id-container{
height: auto;
}
#channel-name-input-id{
margin: 1 0;
}
#add-channel-search-no-key{
padding: 0 0 1 1;
}
/* Mute/Skip ads */
#ad-skip-mute-container{
padding: 1;
height: auto;
}
/* Migrate screen */
MigrationScreen {
align: center middle;
}
#dialog-migration {
grid-size: 2;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 1 2;
width: 35%;
min-width: 50;
max-width: 70;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question-migrate {
column-span: 2;
height: 1fr;
width: 1fr;
content-align: center middle;
}

View File

@@ -0,0 +1,701 @@
import aiohttp
import asyncio
import copy
# Textual imports (Textual is awesome!)
from textual import on
from textual.app import App, ComposeResult
from textual.containers import ScrollableContainer, Grid, Container, Vertical, Horizontal
from textual.events import Click
from textual.screen import Screen
from textual.validation import Function
from textual.widgets import Button, Footer, Header, Static, Label, Input, SelectionList, Checkbox, ContentSwitcher, \
RadioSet, RadioButton
from textual.widgets.selection_list import Selection
from textual_slider import Slider
# Local imports
from . import api_helpers, ytlounge
from .constants import skip_categories
def _validate_pairing_code(pairing_code: str) -> bool:
try:
pairing_code = pairing_code.replace("-", "").replace(" ", "")
int(pairing_code)
return len(pairing_code) == 12
except ValueError:
return False # not a number
class ModalWithClickExit(Screen):
"""A modal screen that exits when clicked outside its bounds.
https://discord.com/channels/1026214085173461072/1033754296224841768/1136015817356611604"""
DEFAULT_CSS = """
ModalWithClickExit {
align: center middle;
layout: vertical;
overflow-y: auto;
background: $background 60%;
}
"""
@on(Click)
def close_out_bounds(self, event: Click) -> None:
if self.get_widget_at(event.screen_x, event.screen_y)[0] is self:
self.dismiss()
class Element(Static):
"""Base class for elements (devices and channels).
It has a name and a remove button.
"""
def __init__(self, element: dict, tooltip: str = None, **kwargs) -> None:
super().__init__(**kwargs)
self.element_data = element
self.element_name = ""
self.process_values_from_data()
self.tooltip = tooltip
def process_values_from_data(self):
pass
def compose(self) -> ComposeResult:
yield Button(label=self.element_name, classes="element-name", disabled=True, id="element-name")
yield Button("Remove", classes="element-remove", variant="error", id="element-remove")
def on_mount(self) -> None:
if self.tooltip:
self.query_one(".element-name").tooltip = self.tooltip
self.query_one(".element-name").disabled = False
class Device(Element):
"""A device element."""
def process_values_from_data(self):
print("HIIII")
print(self.element_data)
if "name" in self.element_data and self.element_data["name"]:
self.element_name = self.element_data["name"]
else:
self.element_name = f"Unnamed device with id {self.element_data['screen_id'][:5]}...{self.element_data['screen_id'][-5:]}"
class Channel(Element):
"""A channel element."""
def process_values_from_data(self):
if "name" in self.element_data:
self.element_name = self.element_data["name"]
else:
self.element_name = f"Unnamed channel with id {self.element_data['channel_id']}"
class ChannelRadio(RadioButton):
"""A radio button for a channel."""
def __init__(self, channel: tuple, **kwargs) -> None:
label = f"{channel[1]} - Subs: {channel[2]}"
super().__init__(label=label, **kwargs)
self.channel_data = channel
class MigrationScreen(ModalWithClickExit):
"""Screen with a dialog to remove old ATVS config."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "remove_and_save", "Remove and save"),
("q,ctrl+c", "exit", "Exit")]
AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult:
yield Grid(
Label(
"Welcome to the new configurator! You seem to have the legacy 'atvs' entry on your config file, do you want to remove it?\n(The app won't start with it present)",
id="question", classes="button-100"),
Button("Remove and save", variant="primary", id="migrate-remove-save", classes="button-100"),
Button("Don't remove", variant="error", id="migrate-no-change", classes="button-100"),
id="dialog-migration",
)
def action_exit(self) -> None:
self.app.exit()
@on(Button.Pressed, "#migrate-no-change")
def action_no_change(self) -> None:
self.app.pop_screen()
@on(Button.Pressed, "#migrate-remove-save")
def action_remove_and_save(self) -> None:
del self.app.config.atvs
self.app.config.save()
self.app.pop_screen()
class ExitScreen(ModalWithClickExit):
"""Screen with a dialog to exit."""
BINDINGS = [("escape", "dismiss()", "Cancel"),
("s", "save", "Save"),
("q,ctrl+c", "exit", "Exit")]
AUTO_FOCUS = "#exit-save"
def compose(self) -> ComposeResult:
yield Grid(
Label("Are you sure you want to exit without saving?", id="question", classes="button-100"),
Button("Save", variant="success", id="exit-save", classes="button-100"),
Button("Don't save", variant="error", id="exit-no-save", classes="button-100"),
Button("Cancel", variant="primary", id="exit-cancel", classes="button-100"),
id="dialog-exit",
)
def action_exit(self) -> None:
self.app.exit()
def action_save(self) -> None:
self.app.config.save()
self.app.exit()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "exit-no-save":
self.app.exit()
elif event.button.id == "exit-save":
self.app.config.save()
self.app.exit()
else:
self.app.pop_screen()
class AddDevice(ModalWithClickExit):
"""Screen with a dialog to add a device, either with a pairing code or with lan discovery."""
BINDINGS = [("escape", "dismiss({})", "Return")]
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, web_session)
self.devices_discovered_dial = []
def compose(self) -> ComposeResult:
with Container(id="add-device-container"):
yield Label("Add Device", classes="title")
with Grid(id="add-device-switch-buttons"):
yield Button("Add with pairing code", id="add-device-pin-button", classes="button-switcher")
yield Button("Add with lan discovery", id="add-device-dial-button", classes="button-switcher")
with ContentSwitcher(id="add-device-switcher", initial="add-device-pin-container"):
with Container(id="add-device-pin-container"):
yield Input(placeholder="Pairing Code (found in Settings - Link with TV code)",
id="pairing-code-input",
validators=[
Function(_validate_pairing_code, "Invalid pairing code format")
]
)
yield Input(placeholder="Device Name (auto filled if empty/optional)", id="device-name-input")
yield Button("Add", id="add-device-pin-add-button", variant="success", disabled=True)
yield Label(id="add-device-info")
with Container(id="add-device-dial-container"):
yield Label(
"Make sure your device is on the same network as this computer\nIf it isn't showing up, try restarting the app.\nIf running in docker, make sure to use `--network=host`\nTo refresh the list, close and open the dialog again",
classes="subtitle")
yield SelectionList(("Searching for devices...", "", False), id="dial-devices-list", disabled=True)
yield Button("Add selected devices", id="add-device-dial-add-button", variant="success",
disabled=True)
async def on_mount(self) -> None:
self.devices_discovered_dial = []
asyncio.create_task(self.task_discover_devices())
async def task_discover_devices(self):
devices_found = await self.api_helper.discover_youtube_devices_dial()
list_widget: SelectionList = self.query_one("#dial-devices-list")
list_widget.clear_options()
if devices_found:
# print(devices_found)
devices_found_parsed = []
for index, i in enumerate(devices_found):
devices_found_parsed.append(Selection(i["name"], index, False))
list_widget.add_options(devices_found_parsed)
self.query_one("#dial-devices-list").disabled = False
self.devices_discovered_dial = devices_found
else:
list_widget.add_option(("No devices found", "", False))
@on(Button.Pressed, "#add-device-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id
self.query_one("#add-device-switcher").current = event.button.id.replace("-button", "-container")
@on(Input.Changed, "#pairing-code-input")
def changed_pairing_code(self, event: Input.Changed):
self.query_one("#add-device-button").disabled = not event.validation_result.is_valid
@on(Input.Submitted, "#pairing-code-input")
@on(Button.Pressed, "#add-device-pin-add-button")
async def handle_add_device_pin(self) -> None:
self.query_one("#add-device-button").disabled = True
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
pairing_code = self.query_one("#pairing-code-input").value
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces
device_name = self.parent.query_one("#device-name-input").value
paired = False
try:
paired = await lounge_controller.pair(pairing_code)
except:
pass
if paired:
device = {
"screen_id": lounge_controller.auth.screen_id,
"name": device_name if device_name else lounge_controller.screen_name,
"offset": 0,
}
self.query_one("#pairing-code-input").value = ""
self.query_one("#device-name-input").value = ""
self.query_one("#add-device-info").update(f"[#00ff00][b]Successfully added {device['name']}")
self.dismiss([device])
else:
self.query_one("#pairing-code-input").value = ""
self.query_one("#add-device-button").disabled = False
self.query_one("#add-device-info").update("[#ff0000]Failed to add device")
@on(Button.Pressed, "#add-device-dial-add-button")
def handle_add_device_dial(self) -> None:
list_widget: SelectionList = self.query_one("#dial-devices-list")
selected_devices = list_widget.selected
devices = []
for i in selected_devices:
devices.append(self.devices_discovered_dial[i])
self.dismiss(devices)
@on(SelectionList.SelectedChanged, "#dial-devices-list")
def changed_device_list(self, event: SelectionList.SelectedChanged):
self.query_one("#add-device-dial-add-button").disabled = not event.selection_list.selected
class AddChannel(ModalWithClickExit):
"""Screen with a dialog to add a channel, either using search or with a channel id."""
BINDINGS = [("escape", "dismiss(())", "Return")]
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
web_session = aiohttp.ClientSession()
self.api_helper = api_helpers.ApiHelper(config, web_session)
def compose(self) -> ComposeResult:
with Container(id="add-channel-container"):
yield Label("Add Channel", classes="title")
yield Label(
"Select a method to add a channel. Adding via search only works if a YouTube api key has been set",
id="add-channel-label", classes="subtitle")
with Grid(id="add-channel-switch-buttons"):
yield Button("Add by channel name", id="add-channel-search-button", classes="button-switcher")
yield Button("Add by channel ID", id="add-channel-id-button", classes="button-switcher")
yield Label(id="add-channel-info", classes="subtitle")
with ContentSwitcher(id="add-channel-switcher", initial="add-channel-search-container"):
with Vertical(id="add-channel-search-container"):
if self.config.apikey:
with Grid(id="add-channel-search-inputs"):
yield Input(placeholder="Enter channel name", id="channel-name-input-search")
yield Button("Search", id="search-channel-button", variant="success")
yield RadioSet(RadioButton(label="Search to see results", disabled=True),
id="channel-search-results")
yield Button("Add", id="add-channel-button-search", variant="success", disabled=True,
classes="button-100")
else:
yield Label(
"[#ff0000]No api key set, cannot search for channels. You can add it the config section below",
id="add-channel-search-no-key", classes="subtitle")
with Vertical(id="add-channel-id-container"):
yield Input(placeholder="Enter channel ID (example: UCuAXFkgsw1L7xaCfnd5JJOw)",
id="channel-id-input")
yield Input(placeholder="Enter channel name (only used to display in the config file)",
id="channel-name-input-id")
yield Button("Add", id="add-channel-button-id", variant="success", classes="button-100")
@on(RadioSet.Changed, "#channel-search-results")
def handle_radio_set_changed(self, event: RadioSet.Changed) -> None:
self.query_one("#add-channel-button-search").disabled = False
@on(Button.Pressed, "#add-channel-switch-buttons > *")
def handle_switch_buttons(self, event: Button.Pressed) -> None:
button_ = event.button.id
self.query_one("#add-channel-switcher").current = event.button.id.replace("-button", "-container")
@on(Button.Pressed, "#search-channel-button")
@on(Input.Submitted, "#channel-name-input-search")
async def handle_search_channel(self) -> None:
channel_name = self.query_one("#channel-name-input-search").value
if not channel_name:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel name")
return
self.query_one("#search-channel-button").disabled = True
self.query_one("#add-channel-info").update("Searching...")
self.query_one("#add-channel-button-search").disabled = True
self.query_one("#channel-search-results").remove_children()
try:
channels_list = await self.api_helper.search_channels(channel_name)
except:
self.query_one("#add-channel-info").update("[#ff0000]Failed to search for channel")
self.query_one("#search-channel-button").disabled = False
return
for i in channels_list:
self.query_one("#channel-search-results").mount(ChannelRadio(i))
if channels_list:
self.query_one("#search-channel-button").disabled = False
self.query_one("#add-channel-info").update("")
@on(Button.Pressed, "#add-channel-button-search")
def handle_add_channel_search(self) -> None:
channel = self.query_one("#channel-search-results").pressed_button.channel_data
if not channel:
self.query_one("#add-channel-info").update("[#ff0000]Please select a channel")
return
self.query_one("#add-channel-info").update("Adding...")
self.dismiss(channel)
@on(Button.Pressed, "#add-channel-button-id")
@on(Input.Submitted, "#channel-id-input")
@on(Input.Submitted, "#channel-name-input-id")
def handle_add_channel_id(self) -> None:
channel_id = self.query_one("#channel-id-input").value
channel_name = self.query_one("#channel-name-input-id").value
if not channel_id:
self.query_one("#add-channel-info").update("[#ff0000]Please enter a channel ID")
return
if not channel_name:
channel_name = channel_id
channel = (channel_id, channel_name, "hidden")
self.query_one("#add-channel-info").update("Adding...")
self.dismiss(channel)
class EditDevice(ModalWithClickExit):
"""Screen with a dialog to edit a device. Used by the DevicesManager."""
BINDINGS = [("escape", "close_screen_saving", "Return")]
def __init__(self, device: Element, **kwargs) -> None:
super().__init__(**kwargs)
self.device_data = device.element_data
self.device_widget = device
def action_close_screen_saving(self) -> None:
self.dismiss()
def dismiss(self) -> None:
self.device_data["name"] = self.query_one("#device-name-input").value
self.device_data["screen_id"] = self.query_one("#device-id-input").value
self.device_data["offset"] = int(self.query_one("#device-offset-input").value)
super().dismiss(self.device_widget)
def compose(self) -> ComposeResult:
name = self.device_data.get("name", "")
offset = self.device_data.get("offset", 0)
with Container(id="edit-device-container"):
yield Label("Edit device (ESCAPE to exit)", classes="title")
yield Label("Device name")
yield Input(placeholder="Device name", id="device-name-input", value=name)
yield Label("Device screen id")
with Grid(id="device-id-container"):
yield Input(placeholder="Device id", id="device-id-input", value=self.device_data["screen_id"],
password=True)
yield Button("Show id", id="device-id-view")
yield Label("Device offset (in milliseconds)")
with Horizontal(id="device-offset-container"):
yield Input(id="device-offset-input", value=str(offset))
yield Slider(name="Device offset", id="device-offset-slider", min=0, max=2000, step=100, value=offset)
def on_slider_changed(self, event: Slider.Changed) -> None:
input = self.query_one("#device-offset-input")
with input.prevent(Input.Changed):
input.value = str(event.slider.value)
def on_input_changed(self, event: Input.Changed):
if event.input.id == "device-offset-input":
value = event.input.value
if value.isdigit():
value = int(value)
slider = self.query_one("#device-offset-slider")
with slider.prevent(Slider.Changed):
self.query_one("#device-offset-slider").value = value
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "device-id-view":
if "Show" in event.button.label:
event.button.label = "Hide id"
self.query_one("#device-id-input").password = False
else:
event.button.label = "Show id"
self.query_one("#device-id-input").password = True
class DevicesManager(Vertical):
"""Manager for devices, allows to add, edit and remove devices."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
self.devices = config.devices
def compose(self) -> ComposeResult:
yield Label("Devices", classes="title")
with Horizontal(id="add-device-button-container"):
yield Button("Add Device", id="add-device", classes="button-100")
for device in self.devices:
yield Device(device, tooltip="Click to edit")
def new_devices(self, device_data) -> None:
if device_data:
device_widget = None
for i in device_data:
self.devices.append(i)
device_widget = Device(i, tooltip="Click to edit")
self.mount(device_widget)
device_widget.focus(scroll_visible=True)
def edit_device(self, device_widget: Element) -> None:
device_widget.process_values_from_data()
device_widget.query_one("#element-name").label = device_widget.element_name
@on(Button.Pressed, "#element-remove")
def remove_channel(self, event: Button.Pressed):
channel_to_remove: Element = event.button.parent
self.config.channel_whitelist.remove(channel_to_remove.element_data)
channel_to_remove.remove()
@on(Button.Pressed, "#add-device")
def add_device(self, event: Button.Pressed):
self.app.push_screen(AddDevice(self.config), callback=self.new_devices)
@on(Button.Pressed, "#element-name")
def edit_channel(self, event: Button.Pressed):
channel_to_edit: Element = event.button.parent
self.app.push_screen(EditDevice(channel_to_edit), callback=self.edit_device)
class ApiKeyManager(Vertical):
"""Manager for the YouTube Api Key."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("YouTube Api Key", classes="title")
yield Label(
"You can get a YouTube Api Key from the [link=https://console.developers.google.com/apis/credentials]Google Cloud Console[/link]")
with Grid(id="api-key-grid"):
yield Input(placeholder="YouTube Api Key", id="api-key-input", password=True, value=self.config.apikey)
yield Button("Show key", id="api-key-view")
@on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed):
self.config.apikey = event.input.value
# try: # ChannelWhitelist might not be mounted
# self.app.query_one("#warning-no-key").display = not self.config.apikey
# except:
# pass
@on(Button.Pressed, "#api-key-view")
def pressed_api_key_view(self, event: Button.Pressed):
if "Show" in event.button.label:
event.button.label = "Hide key"
self.query_one("#api-key-input").password = False
else:
event.button.label = "Show key"
self.query_one("#api-key-input").password = True
class SkipCategoriesManager(Vertical):
"""Manager for skip categories, allows to select which categories to skip."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Skip Categories", classes="title")
yield Label("Select the categories you want to skip", classes="subtitle")
skip_categories_parsed = []
for i in skip_categories:
name, value = i
if value in self.config.skip_categories:
skip_categories_parsed.append((name, value, True))
else:
skip_categories_parsed.append((name, value, False))
# print(skip_categories_parsed)
yield SelectionList(*skip_categories_parsed, id="skip-categories-compact-list")
@on(SelectionList.SelectedChanged, "#skip-categories-compact-list")
def changed_skip_categories(self, event: SelectionList.SelectedChanged):
self.config.skip_categories = event.selection_list.selected
class SkipCountTrackingManager(Vertical):
"""Manager for skip count tracking, allows to enable/disable skip count tracking."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Skip count tracking", classes="title")
yield Label(
"This feature tracks which segments you have skipped to let users know how much their submission has helped others and used as a metric along with upvotes to ensure that spam doesn't get into the database. The program sends a message to the sponsor block server each time you skip a segment. Hopefully most people don't change this setting so that the view numbers are accurate. :)",
classes="subtitle", id="skip-count-tracking-subtitle")
yield Checkbox(value=self.config.skip_count_tracking, id="skip-count-tracking-switch",
label="Enable skip count tracking")
@on(Checkbox.Changed, "#skip-count-tracking-switch")
def changed_skip_tracking(self, event: Checkbox.Changed):
self.config.skip_count_tracking = event.checkbox.value
class AdSkipMuteManager(Vertical):
"""Manager for ad skip/mute, allows to enable/disable ad skip/mute."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Skip/Mute ads", classes="title")
yield Label(
"This feature allows you to automatically mute and/or skip native YouTube ads. Skipping ads only works if that ad shows the 'Skip Ad' button, if it doesn't then it will only be able to be muted.",
classes="subtitle", id="skip-count-tracking-subtitle")
with Horizontal(id="ad-skip-mute-container"):
yield Checkbox(value=self.config.mute_ads, id="mute-ads-switch",
label="Enable skipping ads")
yield Checkbox(value=self.config.skip_ads, id="skip-ads-switch",
label="Enable muting ads")
@on(Checkbox.Changed, "#mute-ads-switch")
def changed_mute(self, event: Checkbox.Changed):
self.config.mute_ads = event.checkbox.value
@on(Checkbox.Changed, "#skip-ads-switch")
def changed_skip(self, event: Checkbox.Changed):
self.config.skip_ads = event.checkbox.value
class ChannelWhitelistManager(Vertical):
"""Manager for channel whitelist, allows to add/remove channels from the whitelist."""
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
def compose(self) -> ComposeResult:
yield Label("Channel Whitelist", classes="title")
yield Label(
"This feature allows to whitelist channels from being skipped. This feature is automatically disabled when no channels have been specified.",
classes="subtitle", id="channel-whitelist-subtitle")
yield Label(":warning: [#FF0000]You need to set your YouTube Api Key in order to use this feature",
id="warning-no-key")
with Horizontal(id="add-channel-button-container"):
yield Button("Add Channel", id="add-channel", classes="button-100")
for channel in self.config.channel_whitelist:
yield Channel(channel)
def on_mount(self) -> None:
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(self.config.channel_whitelist)
def new_channel(self, channel: tuple) -> None:
if channel:
channel_dict = {
"id": channel[0],
"name": channel[1],
}
self.config.channel_whitelist.append(channel_dict)
channel_widget = Channel(channel_dict)
self.mount(channel_widget)
channel_widget.focus(scroll_visible=True)
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(
self.config.channel_whitelist)
@on(Button.Pressed, "#element-remove")
def remove_channel(self, event: Button.Pressed):
channel_to_remove: Element = event.button.parent
self.config.channel_whitelist.remove(channel_to_remove.element_data)
channel_to_remove.remove()
self.app.query_one("#warning-no-key").display = (not self.config.apikey) and bool(
self.config.channel_whitelist)
@on(Button.Pressed, "#add-channel")
def add_channel(self, event: Button.Pressed):
self.app.push_screen(AddChannel(self.config), callback=self.new_channel)
class iSponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""
TITLE = "iSponsorBlockTV"
SUB_TITLE = "Setup Wizard"
BINDINGS = [
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
AUTO_FOCUS = None
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.dark = True
self.config = config
self.initial_config = copy.deepcopy(config)
def compose(self) -> ComposeResult:
yield Header()
yield Footer()
with ScrollableContainer(id="setup-wizard"):
yield DevicesManager(config=self.config, id="devices-manager", classes="container")
yield SkipCategoriesManager(config=self.config, id="skip-categories-manager", classes="container")
yield SkipCountTrackingManager(config=self.config, id="count-segments-manager", classes="container")
yield AdSkipMuteManager(config=self.config, id="ad-skip-mute-manager", classes="container")
yield ChannelWhitelistManager(config=self.config, id="channel-whitelist-manager", classes="container")
yield ApiKeyManager(config=self.config, id="api-key-manager", classes="container")
def on_mount(self) -> None:
if self.check_for_old_config_entries():
self.app.push_screen(MigrationScreen())
pass
def action_save(self) -> None:
self.config.save()
self.initial_config = copy.deepcopy(self.config)
def action_exit_modal(self) -> None:
if self.config != self.initial_config:
self.app.push_screen(ExitScreen())
else: # No changes were made
self.app.exit()
def check_for_old_config_entries(self) -> bool:
if hasattr(self.config, "atvs"):
return True
return False
@on(Input.Changed, "#api-key-input")
def changed_api_key(self, event: Input.Changed):
print("HIIII")
try: # ChannelWhitelist might not be mounted
# Show if no api key is set and at least one channel is in the whitelist
self.app.query_one("#warning-no-key").display = (not event.input.value) and self.config.channel_whitelist
except:
pass
class iSponsorBlockTVSetup(App):
CSS_PATH = "setup-wizard-style.tcss" # tcss is the recommended extension for textual css files
# Bindings for the whole app here, so they are available in all screens
BINDINGS = [
("q,ctrl+c", "exit_modal", "Exit"),
("s", "save", "Save")
]
def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config
self.main_screen = iSponsorBlockTVSetupMainScreen(config=self.config)
def on_mount(self) -> None:
self.push_screen(self.main_screen)
def action_save(self) -> None:
self.main_screen.action_save()
def action_exit_modal(self) -> None:
self.main_screen.action_exit_modal()
def main(config):
app = iSponsorBlockTVSetup(config)
app.run()

114
iSponsorBlockTV/ytlounge.py Normal file
View File

@@ -0,0 +1,114 @@
import asyncio
import aiohttp
import pyytlounge
create_task = asyncio.create_task
class YtLoungeApi(pyytlounge.YtLoungeApi):
def __init__(self, screen_id, config=None, api_helper=None):
super().__init__("iSponsorBlockTV")
self.auth.screen_id = screen_id
self.auth.lounge_id_token = None
self.api_helper = api_helper
self.volume_state = {}
self.subscribe_task = None
self.subscribe_task_watchdog = None
self.callback = None
if config:
self.mute_ads = config.mute_ads
self.skip_ads = config.skip_ads
# Ensures that we still are subscribed to the lounge
async def _watchdog(self):
await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other)
try:
self.subscribe_task.cancel()
except Exception as e:
pass
# Subscribe to the lounge and start the watchdog
async def subscribe_monitored(self, callback):
self.callback = callback
try:
self.subscribe_task_watchdog.cancel()
except:
pass # No watchdog task
self.subscribe_task = asyncio.create_task(super().subscribe(callback))
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
return self.subscribe_task
# Process a lounge subscription event
def _process_event(self, event_id: int, event_type: str, args):
# print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})")
# (Re)start the watchdog
try:
self.subscribe_task_watchdog.cancel()
except:
pass
finally:
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
# A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we can get the segments)
if event_type == "onStateChange":
data = args[0]
self.state.apply_state(data)
self._update_state()
# print(data)
# Unmute when the video starts playing
if self.mute_ads and data["state"] == "1":
create_task(self.mute(False, override=True))
elif event_type == "nowPlaying":
data = args[0]
self.state = pyytlounge.PlaybackState(self._logger, data)
self._update_state()
# Unmute when the video starts playing
if self.mute_ads and data.get("state", "0") == "1":
create_task(self.mute(False, override=True))
elif self.mute_ads and event_type == "onAdStateChange":
data = args[0]
if data["adState"] == '0': # Ad is not playing
create_task(self.mute(False, override=True))
else: # Seen multiple other adStates, assuming they are all ads
create_task(self.mute(True, override=True))
# Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it)
elif event_type == "onVolumeChanged":
self.volume_state = args[0]
pass
# Gets segments for the next video before it starts playing
elif event_type == "autoplayUpNext":
if len(args) > 0 and (vid_id := args[0]["videoId"]): # if video id is not empty
create_task(self.api_helper.get_segments(vid_id))
# #Used to know if an ad is skippable or not
elif event_type == "adPlaying":
data = args[0]
# Gets segments for the next video (after the ad) before it starts playing
if vid_id := data["contentVideoId"]:
create_task(self.api_helper.get_segments(vid_id))
if data["isSkippable"] == "true": # YouTube uses strings for booleans
if self.skip_ads:
create_task(self.skip_ad())
create_task(self.mute(False, override=True))
elif self.mute_ads:
create_task(self.mute(True, override=True))
else:
super()._process_event(event_id, event_type, args)
# Set the volume to a specific value (0-100)
async def set_volume(self, volume: int) -> None:
await super()._command("setVolume", {"volume": volume})
# Mute or unmute the device (if the device already is in the desired state, nothing happens)
# mute: True to mute, False to unmute
# override: If True, the command is sent even if the device already is in the desired state
# TODO: Only works if the device is subscribed to the lounge
async def mute(self, mute: bool, override: bool = False) -> None:
if mute:
mute_str = "true"
else:
mute_str = "false"
if override or not (self.volume_state.get("muted", "false") == mute_str):
self.volume_state["muted"] = mute_str
# YouTube wants the volume when unmuting, so we send it
await super()._command("setVolume", {"volume": self.volume_state.get("volume", 100), "muted": mute_str})

5
main_tui.py Normal file
View File

@@ -0,0 +1,5 @@
from iSponsorBlockTV import setup_wizard
from iSponsorBlockTV.helpers import Config
config = Config("config.json")
setup_wizard.main(config)

View File

@@ -1,6 +1,10 @@
miniaudio==1.57 miniaudio==1.57
pyatv
aiohttp aiohttp
aiodns
async-cache async-cache
argparse argparse
pyytlounge
textual
textual-slider
ssdp
rich
xmltodict