diff --git a/.DS_Store b/.DS_Store index 44ab99e..0c99c40 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 288dc02..a060794 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,9 @@ MANIFEST pip-log.txt pip-delete-this-directory.txt +# macOS +*.DS_Store + # Unit test / coverage reports htmlcov/ .tox/ diff --git a/iSponsorBlockTV/api_helpers.py b/iSponsorBlockTV/api_helpers.py new file mode 100644 index 0000000..ae05041 --- /dev/null +++ b/iSponsorBlockTV/api_helpers.py @@ -0,0 +1,89 @@ +from cache import AsyncTTL, AsyncLRU +from . import constants +from hashlib import sha256 +from asyncio import create_task +import html + + +def listToTuple(function): + def wrapper(*args): + args = [tuple(x) if type(x) == list else x for x in args] + result = function(*args) + result = tuple(result) if type(result) == list else result + return result + + return wrapper + + +@AsyncLRU(maxsize=10) +async def get_vid_id(title, artist, api_key, web_session): + params = {"q": title + " " + artist, "key": api_key, "part": "snippet"} + url = constants.Youtube_api + "search" + async with web_session.get(url, params=params) as resp: + data = await resp.json() + for i in data["items"]: + title_api = html.unescape(i["snippet"]["title"]) + artist_api = html.unescape(i["snippet"]["channelTitle"]) + if title_api == title and artist_api == artist: + return i["id"]["videoId"] + return + + +@listToTuple +@AsyncTTL(time_to_live=300, maxsize=5) +async def get_segments(vid_id, web_session, categories=["sponsor"]): + vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[ + :4 + ] # Hashes video id and get the first 4 characters + params = { + "category": categories, + "actionType": constants.SponsorBlock_actiontype, + "service": constants.SponsorBlock_service, + } + headers = {"Accept": "application/json"} + url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed + async with web_session.get(url, headers=headers, params=params) as response: + response = await response.json() + for i in response: + if str(i["videoID"]) == str(vid_id): + response = i + break + segments = [] + try: + for i in response["segments"]: + segment = i["segment"] + UUID = i["UUID"] + segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]} + try: + # Get segment before to check if they are too close to each other + segment_before_end = segments[-1]["end"] + segment_before_start = segments[-1]["start"] + segment_before_UUID = segments[-1]["UUID"] + + except: + segment_before_end = -10 + if ( + segment_dict["start"] - segment_before_end < 1 + ): # Less than 1 second appart, combine them and skip them together + segment_dict["start"] = segment_before_start + segment_dict["UUID"].append(segment_before_UUID) + segments.pop() + segments.append(segment_dict) + except: + pass + return segments + + +async def viewed_segments(UUID, web_session): + url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" + for i in UUID: + create_task(mark_viewed_segment(i, web_session)) + return + + +async def mark_viewed_segment(UUID, web_session): + url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" + params = {"UUID": UUID} + async with web_session.post(url, params=params) as response: + response_text = await response.text() + return diff --git a/iSponsorBlockTV/config_setup.py b/iSponsorBlockTV/config_setup.py index d777375..b6a1b39 100644 --- a/iSponsorBlockTV/config_setup.py +++ b/iSponsorBlockTV/config_setup.py @@ -9,13 +9,15 @@ def save_config(config, config_file): with open(config_file, "w") as f: json.dump(config, f) -#Taken from postlund/pyatv atvremote.py + +# Taken from postlund/pyatv atvremote.py async def _read_input(loop: asyncio.AbstractEventLoop, prompt: str): sys.stdout.write(prompt) sys.stdout.flush() user_input = await loop.run_in_executor(None, sys.stdin.readline) return user_input.strip() + async def find_atvs(loop): devices = await pyatv.scan(loop) if not devices: @@ -23,14 +25,20 @@ async def find_atvs(loop): return atvs = [] for i in devices: - #Only get Apple TV's - if i.device_info.model in [DeviceModel.Gen4, DeviceModel.Gen4K, DeviceModel.AppleTV4KGen2]: - #if i.device_info.model in [DeviceModel.AppleTV4KGen2]: #FOR TESTING + # Only get Apple TV's + if i.device_info.model in [ + DeviceModel.Gen4, + DeviceModel.Gen4K, + DeviceModel.AppleTV4KGen2, + ]: + # if i.device_info.model in [DeviceModel.AppleTV4KGen2]: #FOR TESTING if input("Found {}. Do you want to add it? (y/n) ".format(i.name)) == "y": identifier = i.identifier - pairing = await pyatv.pair(i, loop=loop, protocol=pyatv.Protocol.AirPlay) + pairing = await pyatv.pair( + i, loop=loop, protocol=pyatv.Protocol.AirPlay + ) await pairing.begin() if pairing.device_provides_pin: pin = await _read_input(loop, "Enter PIN on screen: ") @@ -39,18 +47,23 @@ async def find_atvs(loop): await pairing.finish() if pairing.has_paired: creds = pairing.service.credentials - atvs.append({"identifier": identifier, "airplay_credentials": creds}) + atvs.append( + {"identifier": identifier, "airplay_credentials": creds} + ) print("Pairing successful") await pairing.close() return atvs - - - - + + def main(config, config_file, debug): - try: num_atvs = len(config["atvs"]) - except: num_atvs = 0 - if input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs)) == "y": + try: + num_atvs = len(config["atvs"]) + except: + num_atvs = 0 + if ( + input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs)) + == "y" + ): loop = asyncio.get_event_loop_policy().get_event_loop() if debug: loop.set_debug(True) @@ -62,39 +75,42 @@ def main(config, config_file, debug): for i in atvs: config["atvs"].append(i) print("done adding") - except: + except: print("rewriting atvs (don't worry if none were saved before)") config["atvs"] = atvs - - try : apikey = config["apikey"] - except: + + try: + apikey = config["apikey"] + except: apikey = "" - if apikey != "" : + if apikey != "": if input("Apikey already specified. Change it? (y/n) ") == "y": apikey = input("Enter your API key: ") config["apikey"] = apikey else: - print("get youtube apikey here: https://developers.google.com/youtube/registering_an_application") + print( + "get youtube apikey here: https://developers.google.com/youtube/registering_an_application" + ) apikey = input("Enter your API key: ") config["apikey"] = apikey - - try: skip_categories = config["skip_categories"] - except: + + try: + skip_categories = config["skip_categories"] + except: skip_categories = [] - + if skip_categories != []: if input("Skip categories already specified. Change them? (y/n) ") == "y": - categories = input("Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n") + categories = input( + "Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n" + ) skip_categories = categories.split(" ") else: - categories = input("Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n") + categories = input( + "Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n" + ) skip_categories = categories.split(" ") config["skip_categories"] = skip_categories + print("config finished") save_config(config, config_file) - - - -if __name__ == "__main__": - print("starting") - main() diff --git a/iSponsorBlockTV/constants.py b/iSponsorBlockTV/constants.py new file mode 100644 index 0000000..ad07205 --- /dev/null +++ b/iSponsorBlockTV/constants.py @@ -0,0 +1,6 @@ +userAgent = "iSponsorBlockTV/0.1" +SponsorBlock_service = "youtube" +SponsorBlock_actiontype = "skip" + +SponsorBlock_api = "https://sponsor.ajay.app/api/" +Youtube_api = "https://www.googleapis.com/youtube/v3/" diff --git a/iSponsorBlockTV/helpers.py b/iSponsorBlockTV/helpers.py index bc4b507..737c3de 100644 --- a/iSponsorBlockTV/helpers.py +++ b/iSponsorBlockTV/helpers.py @@ -5,42 +5,51 @@ from . import macos_install import json import os import logging +import sys def load_config(config_file): - try: - with open(config_file) as f: - config = json.load(f) - except: - if os.getenv('iSPBTV_docker'): - print("You are running in docker, you have to mount the config file.\nPlease check the README.md for more information.") + if os.path.exists(config_file): + try: + with open(config_file, "r") as f: + config = json.load(f) + except: + print("Creating config file") + config = {} + else: + if os.getenv("iSPBTV_docker"): + print( + "You are running in docker, you have to mount the config file.\nPlease check the README.md for more information." + ) + sys.exit() + return else: - config = {} #Create blank config to setup + print("Creating config file") + config = {} # Create blank config to setup return config def app_start(): - parser = argparse.ArgumentParser(description='iSponsorblockTV') - parser.add_argument('--file', '-f', default='config.json', help='config file') - parser.add_argument('--setup', '-s', action='store_true', help='setup the program') - parser.add_argument('--debug', '-d', action='store_true', help='debug mode') - parser.add_argument('--macos_install', action='store_true', help='install in macOS') + parser = argparse.ArgumentParser(description="iSponsorblockTV") + parser.add_argument("--file", "-f", default="config.json", help="config file") + parser.add_argument("--setup", "-s", action="store_true", help="setup the program") + parser.add_argument("--debug", "-d", action="store_true", help="debug mode") + parser.add_argument("--macos_install", action="store_true", help="install in macOS") args = parser.parse_args() - + config = load_config(args.file) if args.debug: logging.basicConfig(level=logging.DEBUG) - if args.setup: #Setup the config file + if args.setup: # Setup the config file config_setup.main(config, args.file, args.debug) - if args.macos_install: + elif args.macos_install: macos_install.main() - - else: - try: #Check if config file has the correct structure - config["atvs"], config["apikey"], config["skip_categories"] - except: #If not, ask to setup the program - print("invalid config file, please run with --setup") - os.exit() - main.main(config["atvs"], config["apikey"], config["skip_categories"], args.debug) -if __name__ == "__main__": - app_start() \ No newline at end of file + else: + try: # Check if config file has the correct structure + config["atvs"], config["apikey"], config["skip_categories"] + except: # If not, ask to setup the program + print("invalid config file, please run with --setup") + sys.exit() + main.main( + config["atvs"], config["apikey"], config["skip_categories"], args.debug + ) diff --git a/iSponsorBlockTV/macos_install.py b/iSponsorBlockTV/macos_install.py index f1a963c..6429368 100644 --- a/iSponsorBlockTV/macos_install.py +++ b/iSponsorBlockTV/macos_install.py @@ -2,15 +2,18 @@ import plistlib import os from . import config_setup -default_plist = {"Label": "com.dmunozv04iSponsorBlockTV", - "RunAtLoad": True, - "StartInterval": 20, - "EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"}, - "StandardErrorPath": "", #Fill later - "StandardOutPath": "", - "ProgramArguments" : "", - "WorkingDirectory": "" - } +default_plist = { + "Label": "com.dmunozv04iSponsorBlockTV", + "RunAtLoad": True, + "StartInterval": 20, + "EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"}, + "StandardErrorPath": "", # Fill later + "StandardOutPath": "", + "ProgramArguments": "", + "WorkingDirectory": "", +} + + def create_plist(path): plist = default_plist plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"] @@ -19,14 +22,16 @@ def create_plist(path): plist["WorkingDirectory"] = path launchd_path = os.path.expanduser("~/Library/LaunchAgents/") path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist" - - with open(path_to_save, 'wb') as fp: + + with open(path_to_save, "wb") as fp: plistlib.dump(plist, fp) + def run_setup(file): config = {} config_setup.main(config, file, debug=False) - + + def main(): correct_path = os.path.expanduser("~/iSponsorBlockTV") if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"): @@ -34,13 +39,15 @@ def main(): print("The launch daemon will now be installed") create_plist(correct_path) run_setup(correct_path + "/config.json") - print("Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist") + print( + "Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist" + ) else: if not os.path.exists(correct_path): os.makedirs(correct_path) - print("Please move the program to the correct path: " + correct_path + "opeing now on finder...") + print( + "Please move the program to the correct path: " + + correct_path + + "opeing now on finder..." + ) os.system("open -R " + correct_path) - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/iSponsorBlockTV/main.py b/iSponsorBlockTV/main.py index e59f4c8..1fceb88 100644 --- a/iSponsorBlockTV/main.py +++ b/iSponsorBlockTV/main.py @@ -1,9 +1,10 @@ import asyncio import pyatv import aiohttp -from cache import AsyncTTL import time import logging +from . import api_helpers + def listToTuple(function): def wrapper(*args): @@ -11,13 +12,15 @@ def listToTuple(function): result = function(*args) result = tuple(result) if type(result) == list else result return result + return wrapper + class MyPushListener(pyatv.interface.PushListener): task = None apikey = None rc = None - + web_session = None categories = ["sponsor"] @@ -27,8 +30,7 @@ class MyPushListener(pyatv.interface.PushListener): self.web_session = web_session self.categories = categories self.atv = atv - - + def playstatus_update(self, updater, playstatus): logging.debug("Playstatus update" + str(playstatus)) try: @@ -36,84 +38,70 @@ class MyPushListener(pyatv.interface.PushListener): except: pass time_start = time.time() - self.task = asyncio.create_task(process_playstatus(playstatus, self.apikey, self.rc, self.web_session, self.categories, self.atv, time_start)) + self.task = asyncio.create_task( + process_playstatus( + playstatus, + self.apikey, + self.rc, + self.web_session, + self.categories, + self.atv, + time_start, + ) + ) + def playstatus_error(self, updater, exception): logging.error(exception) print("stopped") - -async def process_playstatus(playstatus, apikey, rc, web_session, categories, atv, time_start): +async def process_playstatus( + playstatus, apikey, rc, web_session, categories, atv, time_start +): logging.debug("App playing is:" + str(atv.metadata.app.identifier)) - if playstatus.device_state == playstatus.device_state.Playing and atv.metadata.app.identifier == "com.google.ios.youtube": - vid_id = await get_vid_id(playstatus.title, playstatus.artist, apikey, web_session) - print(vid_id) - segments, duration = await get_segments(vid_id, web_session, categories) - print(segments) - await time_to_segment(segments, playstatus.position, rc, time_start) - - -@AsyncTTL(time_to_live=300, maxsize=5) -async def get_vid_id(title, artist, api_key, web_session): - url = f"https://youtube.googleapis.com/youtube/v3/search?q={title} - {artist}&key={api_key}&maxResults=1" - async with web_session.get(url) as response: - response = await response.json() - vid_id = response["items"][0]["id"]["videoId"] - return vid_id - -@listToTuple -@AsyncTTL(time_to_live=300, maxsize=5) -async def get_segments(vid_id, web_session, categories = ["sponsor"]): - params = {"videoID": vid_id, - "category": categories, - "actionType": "skip", - "service": "youtube"} - headers = {'Accept': 'application/json'} - url = "https://sponsor.ajay.app/api/skipSegments" - async with web_session.get(url, headers = headers, params = params) as response: - response = await response.json() - segments = [] - try: - duration = response[0]["videoDuration"] - for i in response: - segment = i["segment"] - try: - #Get segment before to check if they are too close to each other - segment_before_end = segments[-1][1] - segment_before_start = segments[-1][0] - - except: - segment_before_end = -10 - if segment[0] - segment_before_end < 1: #Less than 1 second appart, combine them and skip them together - segment = [segment_before_start, segment[1]] - segments.pop() - segments.append(segment) - except: - duration = 0 - return segments, duration + if ( + playstatus.device_state == playstatus.device_state.Playing + and atv.metadata.app.identifier == "com.google.ios.youtube" + ): + vid_id = await api_helpers.get_vid_id( + playstatus.title, playstatus.artist, apikey, web_session + ) + if vid_id: + print(vid_id) + segments = await api_helpers.get_segments(vid_id, web_session, categories) + print(segments) + await time_to_segment( + segments, playstatus.position, rc, time_start, web_session + ) + else: + print("Could not find video id") -async def time_to_segment(segments, position, rc, time_start): +async def time_to_segment(segments, position, rc, time_start, web_session): position = position + (time.time() - time_start) for segment in segments: - if position < 2 and (position >= segment[0] and position < segment[1]): - next_segment = [position, segment[1]] + if position < 2 and ( + position >= segment["start"] and position < segment["end"] + ): + next_segment = [position, segment["end"]] break - if segment[0] > position: + if segment["start"] > position: next_segment = segment break - time_to_next = next_segment[0] - position - await skip(time_to_next, next_segment[1], rc) + time_to_next = next_segment["start"] - position + await skip(time_to_next, next_segment["end"], next_segment["UUID"], rc, web_session) -async def skip(time_to, position, rc): + +async def skip(time_to, position, UUID, rc, web_session): await asyncio.sleep(time_to) await rc.set_position(position) + # await api_helpers.viewed_segments(UUID, web_session) DISABLED FOR NOW async def connect_atv(loop, identifier, airplay_credentials): """Find a device and print what is playing.""" print("Discovering devices on network...") - atvs = await pyatv.scan(loop, identifier = identifier) + atvs = await pyatv.scan(loop, identifier=identifier) if not atvs: print("No device found, will retry") @@ -142,7 +130,7 @@ async def loop_atv(event_loop, atv_config, apikey, categories, web_session): atv.metadata.app except: print("Reconnecting to Apple TV") - #reconnect to apple tv + # reconnect to apple tv atv = await connect_atv(event_loop, identifier, airplay_credentials) if atv: listener = MyPushListener(apikey, atv, categories, web_session) @@ -150,10 +138,6 @@ async def loop_atv(event_loop, atv_config, apikey, categories, web_session): atv.push_updater.listener = listener atv.push_updater.start() print("Push updater started") - - - - def main(atv_configs, apikey, categories, debug): @@ -165,8 +149,3 @@ def main(atv_configs, apikey, categories, debug): for i in atv_configs: loop.create_task(loop_atv(loop, i, apikey, categories, web_session)) loop.run_forever() - - -if __name__ == "__main__": - print("starting") - main() diff --git a/main-macos.py b/main-macos.py index 2264d52..ac7e117 100644 --- a/main-macos.py +++ b/main-macos.py @@ -2,6 +2,6 @@ from iSponsorBlockTV import helpers import sys import os -if getattr(sys, 'frozen', False): - os.environ['SSL_CERT_FILE'] = os.path.join(sys._MEIPASS, 'lib', 'cert.pem') -helpers.app_start() \ No newline at end of file +if getattr(sys, "frozen", False): + os.environ["SSL_CERT_FILE"] = os.path.join(sys._MEIPASS, "lib", "cert.pem") +helpers.app_start() diff --git a/main.py b/main.py index 2ec464a..2238ba2 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,3 @@ from iSponsorBlockTV import helpers -helpers.app_start() \ No newline at end of file +helpers.app_start()