mirror of
https://github.com/dmunozv04/iSponsorBlockTV.git
synced 2025-12-06 11:56:45 +03:00
138 lines
4.5 KiB
Python
138 lines
4.5 KiB
Python
import sys
|
|
import asyncio
|
|
from webbrowser import get
|
|
import pyatv
|
|
import aiohttp
|
|
from async_lru import alru_cache
|
|
from datetime import timedelta
|
|
import json
|
|
|
|
|
|
def listToTuple(function):
|
|
def wrapper(*args):
|
|
args = [tuple(x) if type(x) == list else x for x in args]
|
|
result = function(*args)
|
|
result = tuple(result) if type(result) == list else result
|
|
return result
|
|
return wrapper
|
|
|
|
class MyPushListener(pyatv.interface.PushListener):
|
|
task = None
|
|
apikey = None
|
|
rc = None
|
|
|
|
web_session = None
|
|
categories = ["sponsor"]
|
|
|
|
def __init__(self, apikey, atv, categories):
|
|
self.apikey = apikey
|
|
self.rc = atv.remote_control
|
|
self.app = atv.metadata.app
|
|
self.web_session = aiohttp.ClientSession()
|
|
self.categories = categories
|
|
|
|
|
|
def playstatus_update(self, updater, playstatus):
|
|
try:
|
|
self.task.cancel()
|
|
except:
|
|
pass
|
|
self.task = asyncio.create_task(process_playstatus(playstatus, self.apikey, self.rc, self.app, self.web_session, self.categories))
|
|
|
|
def playstatus_error(self, updater, exception):
|
|
print(exception)
|
|
print("stopped")
|
|
# Error in exception
|
|
|
|
async def process_playstatus(playstatus, apikey, rc, app, web_session, categories):
|
|
if playstatus.device_state == playstatus.device_state.Playing and app.identifier == "com.google.ios.youtube":
|
|
vid_id = await get_vid_id(playstatus.title, playstatus.artist, apikey, web_session)
|
|
print(vid_id)
|
|
segments, duration = await get_segments(vid_id, web_session, categories)
|
|
print(segments)
|
|
await time_to_segment(segments, playstatus.position, rc)
|
|
|
|
|
|
@alru_cache(maxsize = 5)
|
|
async def get_vid_id(title, artist, api_key, web_session):
|
|
url = f"https://youtube.googleapis.com/youtube/v3/search?q={title} - {artist}&key={api_key}&maxResults=1"
|
|
async with web_session.get(url) as response:
|
|
response = await response.json()
|
|
vid_id = response["items"][0]["id"]["videoId"]
|
|
return vid_id
|
|
|
|
@listToTuple
|
|
@alru_cache(maxsize = 5)
|
|
async def get_segments(vid_id, web_session, categories = ["sponsor"]):
|
|
params = {"videoID": vid_id,
|
|
"category": categories,
|
|
"actionType": "skip",
|
|
"service": "youtube"}
|
|
headers = {'Accept': 'application/json'}
|
|
url = "https://sponsor.ajay.app/api/skipSegments"
|
|
async with web_session.get(url, headers = headers, params = params) as response:
|
|
response = await response.json()
|
|
segments = []
|
|
try:
|
|
duration = response[0]["videoDuration"]
|
|
for i in response:
|
|
segments.append(i["segment"])
|
|
except:
|
|
duration = 0
|
|
return segments, duration
|
|
|
|
|
|
async def time_to_segment(segments, position, rc):
|
|
future_segments = []
|
|
for i in segments:
|
|
if i[0] > position:
|
|
future_segments.append(i)
|
|
next_segement = future_segments[0]
|
|
time_to_next = next_segement[0] - position
|
|
await skip(time_to_next, next_segement[1], rc)
|
|
|
|
async def skip(time_to, position, rc):
|
|
await asyncio.sleep(time_to)
|
|
await rc.set_position(position)
|
|
|
|
|
|
async def connect_atv(loop, identifier, airplay_credentials):
|
|
"""Find a device and print what is playing."""
|
|
print("Discovering devices on network...")
|
|
atvs = await pyatv.scan(loop, identifier = identifier)
|
|
|
|
if not atvs:
|
|
print("No device found", file=sys.stderr)
|
|
return
|
|
|
|
config = atvs[0]
|
|
config.set_credentials(pyatv.Protocol.AirPlay, airplay_credentials)
|
|
|
|
print(f"Connecting to {config.address}")
|
|
return await pyatv.connect(config, loop)
|
|
|
|
|
|
async def loop_atv(event_loop, atv_config, apikey, categories):
|
|
identifier = atv_config["identifier"]
|
|
airplay_credentials = atv_config["airplay_credentials"]
|
|
atv = await connect_atv(event_loop, identifier, airplay_credentials)
|
|
|
|
|
|
listener = MyPushListener(apikey, atv, categories)
|
|
try:
|
|
atv.push_updater.listener = listener
|
|
atv.push_updater.start()
|
|
print("Press ENTER to quit")
|
|
await event_loop.run_in_executor(None, sys.stdin.readline)
|
|
except:
|
|
atv.close()
|
|
|
|
def load_config(config_file="config/config.json"):
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
return config["atvs"][0], config["apikey"], config["skip_categories"]
|
|
|
|
if __name__ == "__main__":
|
|
atv_config, apikey, categories = load_config()
|
|
event_loop = asyncio.get_event_loop()
|
|
event_loop.run_until_complete(loop_atv(event_loop, atv_config, apikey, categories)) |