Fix the docker bug #22

Make video ids hashed
Get 10 videos from youtube to make sure that the right id is found
Refactored the api calls
This commit is contained in:
dmunozv04
2022-04-22 14:45:32 +02:00
parent 713554e04d
commit 816b4ec9ef
10 changed files with 258 additions and 149 deletions

BIN
.DS_Store vendored

Binary file not shown.

3
.gitignore vendored
View File

@@ -35,6 +35,9 @@ MANIFEST
pip-log.txt pip-log.txt
pip-delete-this-directory.txt pip-delete-this-directory.txt
# macOS
*.DS_Store
# Unit test / coverage reports # Unit test / coverage reports
htmlcov/ htmlcov/
.tox/ .tox/

View File

@@ -0,0 +1,89 @@
from cache import AsyncTTL, AsyncLRU
from . import constants
from hashlib import sha256
from asyncio import create_task
import html
def listToTuple(function):
def wrapper(*args):
args = [tuple(x) if type(x) == list else x for x in args]
result = function(*args)
result = tuple(result) if type(result) == list else result
return result
return wrapper
@AsyncLRU(maxsize=10)
async def get_vid_id(title, artist, api_key, web_session):
params = {"q": title + " " + artist, "key": api_key, "part": "snippet"}
url = constants.Youtube_api + "search"
async with web_session.get(url, params=params) as resp:
data = await resp.json()
for i in data["items"]:
title_api = html.unescape(i["snippet"]["title"])
artist_api = html.unescape(i["snippet"]["channelTitle"])
if title_api == title and artist_api == artist:
return i["id"]["videoId"]
return
@listToTuple
@AsyncTTL(time_to_live=300, maxsize=5)
async def get_segments(vid_id, web_session, categories=["sponsor"]):
vid_id_hashed = sha256(vid_id.encode("utf-8")).hexdigest()[
:4
] # Hashes video id and get the first 4 characters
params = {
"category": categories,
"actionType": constants.SponsorBlock_actiontype,
"service": constants.SponsorBlock_service,
}
headers = {"Accept": "application/json"}
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with web_session.get(url, headers=headers, params=params) as response:
response = await response.json()
for i in response:
if str(i["videoID"]) == str(vid_id):
response = i
break
segments = []
try:
for i in response["segments"]:
segment = i["segment"]
UUID = i["UUID"]
segment_dict = {"start": segment[0], "end": segment[1], "UUID": [UUID]}
try:
# Get segment before to check if they are too close to each other
segment_before_end = segments[-1]["end"]
segment_before_start = segments[-1]["start"]
segment_before_UUID = segments[-1]["UUID"]
except:
segment_before_end = -10
if (
segment_dict["start"] - segment_before_end < 1
): # Less than 1 second appart, combine them and skip them together
segment_dict["start"] = segment_before_start
segment_dict["UUID"].append(segment_before_UUID)
segments.pop()
segments.append(segment_dict)
except:
pass
return segments
async def viewed_segments(UUID, web_session):
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
for i in UUID:
create_task(mark_viewed_segment(i, web_session))
return
async def mark_viewed_segment(UUID, web_session):
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
params = {"UUID": UUID}
async with web_session.post(url, params=params) as response:
response_text = await response.text()
return

View File

@@ -9,13 +9,15 @@ def save_config(config, config_file):
with open(config_file, "w") as f: with open(config_file, "w") as f:
json.dump(config, f) json.dump(config, f)
#Taken from postlund/pyatv atvremote.py
# Taken from postlund/pyatv atvremote.py
async def _read_input(loop: asyncio.AbstractEventLoop, prompt: str): async def _read_input(loop: asyncio.AbstractEventLoop, prompt: str):
sys.stdout.write(prompt) sys.stdout.write(prompt)
sys.stdout.flush() sys.stdout.flush()
user_input = await loop.run_in_executor(None, sys.stdin.readline) user_input = await loop.run_in_executor(None, sys.stdin.readline)
return user_input.strip() return user_input.strip()
async def find_atvs(loop): async def find_atvs(loop):
devices = await pyatv.scan(loop) devices = await pyatv.scan(loop)
if not devices: if not devices:
@@ -23,14 +25,20 @@ async def find_atvs(loop):
return return
atvs = [] atvs = []
for i in devices: for i in devices:
#Only get Apple TV's # Only get Apple TV's
if i.device_info.model in [DeviceModel.Gen4, DeviceModel.Gen4K, DeviceModel.AppleTV4KGen2]: if i.device_info.model in [
#if i.device_info.model in [DeviceModel.AppleTV4KGen2]: #FOR TESTING DeviceModel.Gen4,
DeviceModel.Gen4K,
DeviceModel.AppleTV4KGen2,
]:
# if i.device_info.model in [DeviceModel.AppleTV4KGen2]: #FOR TESTING
if input("Found {}. Do you want to add it? (y/n) ".format(i.name)) == "y": if input("Found {}. Do you want to add it? (y/n) ".format(i.name)) == "y":
identifier = i.identifier identifier = i.identifier
pairing = await pyatv.pair(i, loop=loop, protocol=pyatv.Protocol.AirPlay) pairing = await pyatv.pair(
i, loop=loop, protocol=pyatv.Protocol.AirPlay
)
await pairing.begin() await pairing.begin()
if pairing.device_provides_pin: if pairing.device_provides_pin:
pin = await _read_input(loop, "Enter PIN on screen: ") pin = await _read_input(loop, "Enter PIN on screen: ")
@@ -39,18 +47,23 @@ async def find_atvs(loop):
await pairing.finish() await pairing.finish()
if pairing.has_paired: if pairing.has_paired:
creds = pairing.service.credentials creds = pairing.service.credentials
atvs.append({"identifier": identifier, "airplay_credentials": creds}) atvs.append(
{"identifier": identifier, "airplay_credentials": creds}
)
print("Pairing successful") print("Pairing successful")
await pairing.close() await pairing.close()
return atvs return atvs
def main(config, config_file, debug): def main(config, config_file, debug):
try: num_atvs = len(config["atvs"]) try:
except: num_atvs = 0 num_atvs = len(config["atvs"])
if input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs)) == "y": except:
num_atvs = 0
if (
input("Found {} Apple TV(s) in config.json. Add more? (y/n) ".format(num_atvs))
== "y"
):
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
if debug: if debug:
loop.set_debug(True) loop.set_debug(True)
@@ -62,39 +75,42 @@ def main(config, config_file, debug):
for i in atvs: for i in atvs:
config["atvs"].append(i) config["atvs"].append(i)
print("done adding") print("done adding")
except: except:
print("rewriting atvs (don't worry if none were saved before)") print("rewriting atvs (don't worry if none were saved before)")
config["atvs"] = atvs config["atvs"] = atvs
try : apikey = config["apikey"] try:
except: apikey = config["apikey"]
except:
apikey = "" apikey = ""
if apikey != "" : if apikey != "":
if input("Apikey already specified. Change it? (y/n) ") == "y": if input("Apikey already specified. Change it? (y/n) ") == "y":
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
config["apikey"] = apikey config["apikey"] = apikey
else: else:
print("get youtube apikey here: https://developers.google.com/youtube/registering_an_application") print(
"get youtube apikey here: https://developers.google.com/youtube/registering_an_application"
)
apikey = input("Enter your API key: ") apikey = input("Enter your API key: ")
config["apikey"] = apikey config["apikey"] = apikey
try: skip_categories = config["skip_categories"] try:
except: skip_categories = config["skip_categories"]
except:
skip_categories = [] skip_categories = []
if skip_categories != []: if skip_categories != []:
if input("Skip categories already specified. Change them? (y/n) ") == "y": if input("Skip categories already specified. Change them? (y/n) ") == "y":
categories = input("Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n") categories = input(
"Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n"
)
skip_categories = categories.split(" ") skip_categories = categories.split(" ")
else: else:
categories = input("Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n") categories = input(
"Enter skip categories (space sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n"
)
skip_categories = categories.split(" ") skip_categories = categories.split(" ")
config["skip_categories"] = skip_categories config["skip_categories"] = skip_categories
print("config finished") print("config finished")
save_config(config, config_file) save_config(config, config_file)
if __name__ == "__main__":
print("starting")
main()

View File

@@ -0,0 +1,6 @@
userAgent = "iSponsorBlockTV/0.1"
SponsorBlock_service = "youtube"
SponsorBlock_actiontype = "skip"
SponsorBlock_api = "https://sponsor.ajay.app/api/"
Youtube_api = "https://www.googleapis.com/youtube/v3/"

View File

@@ -5,42 +5,51 @@ from . import macos_install
import json import json
import os import os
import logging import logging
import sys
def load_config(config_file): def load_config(config_file):
try: if os.path.exists(config_file):
with open(config_file) as f: try:
config = json.load(f) with open(config_file, "r") as f:
except: config = json.load(f)
if os.getenv('iSPBTV_docker'): except:
print("You are running in docker, you have to mount the config file.\nPlease check the README.md for more information.") print("Creating config file")
config = {}
else:
if os.getenv("iSPBTV_docker"):
print(
"You are running in docker, you have to mount the config file.\nPlease check the README.md for more information."
)
sys.exit()
return
else: else:
config = {} #Create blank config to setup print("Creating config file")
config = {} # Create blank config to setup
return config return config
def app_start(): def app_start():
parser = argparse.ArgumentParser(description='iSponsorblockTV') parser = argparse.ArgumentParser(description="iSponsorblockTV")
parser.add_argument('--file', '-f', default='config.json', help='config file') parser.add_argument("--file", "-f", default="config.json", help="config file")
parser.add_argument('--setup', '-s', action='store_true', help='setup the program') parser.add_argument("--setup", "-s", action="store_true", help="setup the program")
parser.add_argument('--debug', '-d', action='store_true', help='debug mode') parser.add_argument("--debug", "-d", action="store_true", help="debug mode")
parser.add_argument('--macos_install', action='store_true', help='install in macOS') parser.add_argument("--macos_install", action="store_true", help="install in macOS")
args = parser.parse_args() args = parser.parse_args()
config = load_config(args.file) config = load_config(args.file)
if args.debug: if args.debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
if args.setup: #Setup the config file if args.setup: # Setup the config file
config_setup.main(config, args.file, args.debug) config_setup.main(config, args.file, args.debug)
if args.macos_install: elif args.macos_install:
macos_install.main() macos_install.main()
else:
try: #Check if config file has the correct structure
config["atvs"], config["apikey"], config["skip_categories"]
except: #If not, ask to setup the program
print("invalid config file, please run with --setup")
os.exit()
main.main(config["atvs"], config["apikey"], config["skip_categories"], args.debug)
if __name__ == "__main__": else:
app_start() try: # Check if config file has the correct structure
config["atvs"], config["apikey"], config["skip_categories"]
except: # If not, ask to setup the program
print("invalid config file, please run with --setup")
sys.exit()
main.main(
config["atvs"], config["apikey"], config["skip_categories"], args.debug
)

View File

@@ -2,15 +2,18 @@ import plistlib
import os import os
from . import config_setup from . import config_setup
default_plist = {"Label": "com.dmunozv04iSponsorBlockTV", default_plist = {
"RunAtLoad": True, "Label": "com.dmunozv04iSponsorBlockTV",
"StartInterval": 20, "RunAtLoad": True,
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"}, "StartInterval": 20,
"StandardErrorPath": "", #Fill later "EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"},
"StandardOutPath": "", "StandardErrorPath": "", # Fill later
"ProgramArguments" : "", "StandardOutPath": "",
"WorkingDirectory": "" "ProgramArguments": "",
} "WorkingDirectory": "",
}
def create_plist(path): def create_plist(path):
plist = default_plist plist = default_plist
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"] plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"]
@@ -19,14 +22,16 @@ def create_plist(path):
plist["WorkingDirectory"] = path plist["WorkingDirectory"] = path
launchd_path = os.path.expanduser("~/Library/LaunchAgents/") launchd_path = os.path.expanduser("~/Library/LaunchAgents/")
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist" path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist"
with open(path_to_save, 'wb') as fp: with open(path_to_save, "wb") as fp:
plistlib.dump(plist, fp) plistlib.dump(plist, fp)
def run_setup(file): def run_setup(file):
config = {} config = {}
config_setup.main(config, file, debug=False) config_setup.main(config, file, debug=False)
def main(): def main():
correct_path = os.path.expanduser("~/iSponsorBlockTV") correct_path = os.path.expanduser("~/iSponsorBlockTV")
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"): if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"):
@@ -34,13 +39,15 @@ def main():
print("The launch daemon will now be installed") print("The launch daemon will now be installed")
create_plist(correct_path) create_plist(correct_path)
run_setup(correct_path + "/config.json") run_setup(correct_path + "/config.json")
print("Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist") print(
"Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist"
)
else: else:
if not os.path.exists(correct_path): if not os.path.exists(correct_path):
os.makedirs(correct_path) os.makedirs(correct_path)
print("Please move the program to the correct path: " + correct_path + "opeing now on finder...") print(
"Please move the program to the correct path: "
+ correct_path
+ "opeing now on finder..."
)
os.system("open -R " + correct_path) os.system("open -R " + correct_path)
if __name__ == "__main__":
main()

View File

@@ -1,9 +1,10 @@
import asyncio import asyncio
import pyatv import pyatv
import aiohttp import aiohttp
from cache import AsyncTTL
import time import time
import logging import logging
from . import api_helpers
def listToTuple(function): def listToTuple(function):
def wrapper(*args): def wrapper(*args):
@@ -11,13 +12,15 @@ def listToTuple(function):
result = function(*args) result = function(*args)
result = tuple(result) if type(result) == list else result result = tuple(result) if type(result) == list else result
return result return result
return wrapper return wrapper
class MyPushListener(pyatv.interface.PushListener): class MyPushListener(pyatv.interface.PushListener):
task = None task = None
apikey = None apikey = None
rc = None rc = None
web_session = None web_session = None
categories = ["sponsor"] categories = ["sponsor"]
@@ -27,8 +30,7 @@ class MyPushListener(pyatv.interface.PushListener):
self.web_session = web_session self.web_session = web_session
self.categories = categories self.categories = categories
self.atv = atv self.atv = atv
def playstatus_update(self, updater, playstatus): def playstatus_update(self, updater, playstatus):
logging.debug("Playstatus update" + str(playstatus)) logging.debug("Playstatus update" + str(playstatus))
try: try:
@@ -36,84 +38,70 @@ class MyPushListener(pyatv.interface.PushListener):
except: except:
pass pass
time_start = time.time() time_start = time.time()
self.task = asyncio.create_task(process_playstatus(playstatus, self.apikey, self.rc, self.web_session, self.categories, self.atv, time_start)) self.task = asyncio.create_task(
process_playstatus(
playstatus,
self.apikey,
self.rc,
self.web_session,
self.categories,
self.atv,
time_start,
)
)
def playstatus_error(self, updater, exception): def playstatus_error(self, updater, exception):
logging.error(exception) logging.error(exception)
print("stopped") print("stopped")
async def process_playstatus(playstatus, apikey, rc, web_session, categories, atv, time_start): async def process_playstatus(
playstatus, apikey, rc, web_session, categories, atv, time_start
):
logging.debug("App playing is:" + str(atv.metadata.app.identifier)) logging.debug("App playing is:" + str(atv.metadata.app.identifier))
if playstatus.device_state == playstatus.device_state.Playing and atv.metadata.app.identifier == "com.google.ios.youtube": if (
vid_id = await get_vid_id(playstatus.title, playstatus.artist, apikey, web_session) playstatus.device_state == playstatus.device_state.Playing
print(vid_id) and atv.metadata.app.identifier == "com.google.ios.youtube"
segments, duration = await get_segments(vid_id, web_session, categories) ):
print(segments) vid_id = await api_helpers.get_vid_id(
await time_to_segment(segments, playstatus.position, rc, time_start) playstatus.title, playstatus.artist, apikey, web_session
)
if vid_id:
@AsyncTTL(time_to_live=300, maxsize=5) print(vid_id)
async def get_vid_id(title, artist, api_key, web_session): segments = await api_helpers.get_segments(vid_id, web_session, categories)
url = f"https://youtube.googleapis.com/youtube/v3/search?q={title} - {artist}&key={api_key}&maxResults=1" print(segments)
async with web_session.get(url) as response: await time_to_segment(
response = await response.json() segments, playstatus.position, rc, time_start, web_session
vid_id = response["items"][0]["id"]["videoId"] )
return vid_id else:
print("Could not find video id")
@listToTuple
@AsyncTTL(time_to_live=300, maxsize=5)
async def get_segments(vid_id, web_session, categories = ["sponsor"]):
params = {"videoID": vid_id,
"category": categories,
"actionType": "skip",
"service": "youtube"}
headers = {'Accept': 'application/json'}
url = "https://sponsor.ajay.app/api/skipSegments"
async with web_session.get(url, headers = headers, params = params) as response:
response = await response.json()
segments = []
try:
duration = response[0]["videoDuration"]
for i in response:
segment = i["segment"]
try:
#Get segment before to check if they are too close to each other
segment_before_end = segments[-1][1]
segment_before_start = segments[-1][0]
except:
segment_before_end = -10
if segment[0] - segment_before_end < 1: #Less than 1 second appart, combine them and skip them together
segment = [segment_before_start, segment[1]]
segments.pop()
segments.append(segment)
except:
duration = 0
return segments, duration
async def time_to_segment(segments, position, rc, time_start): async def time_to_segment(segments, position, rc, time_start, web_session):
position = position + (time.time() - time_start) position = position + (time.time() - time_start)
for segment in segments: for segment in segments:
if position < 2 and (position >= segment[0] and position < segment[1]): if position < 2 and (
next_segment = [position, segment[1]] position >= segment["start"] and position < segment["end"]
):
next_segment = [position, segment["end"]]
break break
if segment[0] > position: if segment["start"] > position:
next_segment = segment next_segment = segment
break break
time_to_next = next_segment[0] - position time_to_next = next_segment["start"] - position
await skip(time_to_next, next_segment[1], rc) await skip(time_to_next, next_segment["end"], next_segment["UUID"], rc, web_session)
async def skip(time_to, position, rc):
async def skip(time_to, position, UUID, rc, web_session):
await asyncio.sleep(time_to) await asyncio.sleep(time_to)
await rc.set_position(position) await rc.set_position(position)
# await api_helpers.viewed_segments(UUID, web_session) DISABLED FOR NOW
async def connect_atv(loop, identifier, airplay_credentials): async def connect_atv(loop, identifier, airplay_credentials):
"""Find a device and print what is playing.""" """Find a device and print what is playing."""
print("Discovering devices on network...") print("Discovering devices on network...")
atvs = await pyatv.scan(loop, identifier = identifier) atvs = await pyatv.scan(loop, identifier=identifier)
if not atvs: if not atvs:
print("No device found, will retry") print("No device found, will retry")
@@ -142,7 +130,7 @@ async def loop_atv(event_loop, atv_config, apikey, categories, web_session):
atv.metadata.app atv.metadata.app
except: except:
print("Reconnecting to Apple TV") print("Reconnecting to Apple TV")
#reconnect to apple tv # reconnect to apple tv
atv = await connect_atv(event_loop, identifier, airplay_credentials) atv = await connect_atv(event_loop, identifier, airplay_credentials)
if atv: if atv:
listener = MyPushListener(apikey, atv, categories, web_session) listener = MyPushListener(apikey, atv, categories, web_session)
@@ -150,10 +138,6 @@ async def loop_atv(event_loop, atv_config, apikey, categories, web_session):
atv.push_updater.listener = listener atv.push_updater.listener = listener
atv.push_updater.start() atv.push_updater.start()
print("Push updater started") print("Push updater started")
def main(atv_configs, apikey, categories, debug): def main(atv_configs, apikey, categories, debug):
@@ -165,8 +149,3 @@ def main(atv_configs, apikey, categories, debug):
for i in atv_configs: for i in atv_configs:
loop.create_task(loop_atv(loop, i, apikey, categories, web_session)) loop.create_task(loop_atv(loop, i, apikey, categories, web_session))
loop.run_forever() loop.run_forever()
if __name__ == "__main__":
print("starting")
main()

View File

@@ -2,6 +2,6 @@ from iSponsorBlockTV import helpers
import sys import sys
import os import os
if getattr(sys, 'frozen', False): if getattr(sys, "frozen", False):
os.environ['SSL_CERT_FILE'] = os.path.join(sys._MEIPASS, 'lib', 'cert.pem') os.environ["SSL_CERT_FILE"] = os.path.join(sys._MEIPASS, "lib", "cert.pem")
helpers.app_start() helpers.app_start()

View File

@@ -1,3 +1,3 @@
from iSponsorBlockTV import helpers from iSponsorBlockTV import helpers
helpers.app_start() helpers.app_start()