mirror of
https://github.com/dmunozv04/iSponsorBlockTV.git
synced 2025-12-10 14:06:44 +03:00
utilize standard logging libraries and formats
This commit is contained in:
@@ -1,160 +0,0 @@
|
||||
import logging
|
||||
|
||||
from rich.logging import RichHandler
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
|
||||
"""
|
||||
Copyright (c) 2020 Will McGugan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
Modified code from rich (https://github.com/textualize/rich)
|
||||
"""
|
||||
|
||||
|
||||
class LogHandler(RichHandler):
|
||||
def __init__(self, device_name, log_name_len, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.filter_strings = []
|
||||
self._log_render = LogRender(
|
||||
device_name=device_name,
|
||||
log_name_len=log_name_len,
|
||||
show_time=True,
|
||||
show_level=True,
|
||||
show_path=False,
|
||||
time_format="[%x %X]",
|
||||
omit_repeated_times=True,
|
||||
level_width=None,
|
||||
)
|
||||
|
||||
def add_filter_string(self, s):
|
||||
self.filter_strings.append(s)
|
||||
|
||||
def _filter(self, s):
|
||||
for i in self.filter_strings:
|
||||
s = s.replace(i, "REDACTED")
|
||||
return s
|
||||
|
||||
def format(self, record):
|
||||
original = super().format(record)
|
||||
return self._filter(original)
|
||||
|
||||
|
||||
class LogRender:
|
||||
def __init__(
|
||||
self,
|
||||
device_name,
|
||||
log_name_len,
|
||||
show_time=True,
|
||||
show_level=False,
|
||||
show_path=True,
|
||||
time_format="[%x %X]",
|
||||
omit_repeated_times=True,
|
||||
level_width=8,
|
||||
):
|
||||
self.filter_strings = []
|
||||
self.log_name_len = log_name_len
|
||||
self.device_name = device_name
|
||||
self.show_time = show_time
|
||||
self.show_level = show_level
|
||||
self.show_path = show_path
|
||||
self.time_format = time_format
|
||||
self.omit_repeated_times = omit_repeated_times
|
||||
self.level_width = level_width
|
||||
self._last_time = None
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
console,
|
||||
renderables,
|
||||
log_time,
|
||||
time_format=None,
|
||||
level="",
|
||||
path=None,
|
||||
line_no=None,
|
||||
link_path=None,
|
||||
):
|
||||
from rich.containers import Renderables
|
||||
from rich.table import Table
|
||||
|
||||
output = Table.grid(padding=(0, 1))
|
||||
output.expand = True
|
||||
if self.show_time:
|
||||
output.add_column(style="log.time")
|
||||
if self.show_level:
|
||||
output.add_column(style="log.level", width=self.level_width)
|
||||
output.add_column(
|
||||
width=self.log_name_len, style=Style(color="yellow"), overflow="fold"
|
||||
)
|
||||
output.add_column(ratio=1, style="log.message", overflow="fold")
|
||||
if self.show_path and path:
|
||||
output.add_column(style="log.path")
|
||||
row = []
|
||||
if self.show_time:
|
||||
log_time = log_time or console.get_datetime()
|
||||
time_format = time_format or self.time_format
|
||||
if callable(time_format):
|
||||
log_time_display = time_format(log_time)
|
||||
else:
|
||||
log_time_display = Text(log_time.strftime(time_format))
|
||||
if log_time_display == self._last_time and self.omit_repeated_times:
|
||||
row.append(Text(" " * len(log_time_display)))
|
||||
else:
|
||||
row.append(log_time_display)
|
||||
self._last_time = log_time_display
|
||||
if self.show_level:
|
||||
row.append(level)
|
||||
row.append(Text(self.device_name))
|
||||
row.append(Renderables(renderables))
|
||||
if self.show_path and path:
|
||||
path_text = Text()
|
||||
path_text.append(
|
||||
path, style=f"link file://{link_path}" if link_path else ""
|
||||
)
|
||||
if line_no:
|
||||
path_text.append(":")
|
||||
path_text.append(
|
||||
f"{line_no}",
|
||||
style=f"link file://{link_path}#{line_no}" if link_path else "",
|
||||
)
|
||||
row.append(path_text)
|
||||
|
||||
output.add_row(*row)
|
||||
return output
|
||||
|
||||
|
||||
class LogFormatter(logging.Formatter):
|
||||
def __init__(
|
||||
self, fmt=None, datefmt=None, style="%", validate=True, filter_strings=None
|
||||
):
|
||||
super().__init__(fmt, datefmt, style, validate)
|
||||
self.filter_strings = filter_strings or []
|
||||
|
||||
def add_filter_string(self, s):
|
||||
self.filter_strings.append(s)
|
||||
|
||||
def _filter(self, s):
|
||||
print(s)
|
||||
for i in self.filter_strings:
|
||||
s = s.replace(i, "REDACTED")
|
||||
return s
|
||||
|
||||
def format(self, record):
|
||||
original = logging.Formatter.format(self, record)
|
||||
return self._filter(original)
|
||||
@@ -7,7 +7,7 @@ from typing import Optional
|
||||
import aiohttp
|
||||
import rich
|
||||
|
||||
from . import api_helpers, logging_helpers, ytlounge
|
||||
from . import api_helpers, ytlounge
|
||||
|
||||
|
||||
class DeviceListener:
|
||||
@@ -22,9 +22,10 @@ class DeviceListener:
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
self.logger.setLevel(logging.INFO)
|
||||
rh = logging_helpers.LogHandler(device.name, log_name_len, level=logging.DEBUG)
|
||||
rh.add_filter_string(device.screen_id)
|
||||
self.logger.addHandler(rh)
|
||||
sh = logging.StreamHandler()
|
||||
sh.setFormatter(logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||||
self.logger.addHandler(sh)
|
||||
self.logger.info(f"Starting device")
|
||||
self.lounge_controller = ytlounge.YtLoungeApi(
|
||||
device.screen_id, config, api_helper, self.logger
|
||||
@@ -74,7 +75,7 @@ class DeviceListener:
|
||||
"Connected to device %s (%s)", lounge_controller.screen_name, self.name
|
||||
)
|
||||
try:
|
||||
# print("Subscribing to lounge")
|
||||
self.logger.info("Subscribing to lounge")
|
||||
sub = await lounge_controller.subscribe_monitored(self)
|
||||
await sub
|
||||
except:
|
||||
|
||||
@@ -102,7 +102,6 @@ class Device(Element):
|
||||
"""A device element."""
|
||||
|
||||
def process_values_from_data(self):
|
||||
print("HIIII")
|
||||
print(self.element_data)
|
||||
if "name" in self.element_data and self.element_data["name"]:
|
||||
self.element_name = self.element_data["name"]
|
||||
|
||||
@@ -67,23 +67,23 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
data = args[0]
|
||||
# Unmute when the video starts playing
|
||||
if self.mute_ads and data.get("state", "0") == "1":
|
||||
# print("Ad has ended, unmuting")
|
||||
self.logger.info("Ad has ended, unmuting")
|
||||
create_task(self.mute(False, override=True))
|
||||
elif event_type == "onAdStateChange":
|
||||
data = args[0]
|
||||
if data["adState"] == "0": # Ad is not playing
|
||||
# print("Ad has ended, unmuting")
|
||||
self.logger.info("Ad has ended, unmuting")
|
||||
create_task(self.mute(False, override=True))
|
||||
elif (
|
||||
self.skip_ads and data["isSkipEnabled"] == "true"
|
||||
): # YouTube uses strings for booleans
|
||||
self._logger.info("Ad can be skipped, skipping")
|
||||
self.logger.info("Ad can be skipped, skipping")
|
||||
create_task(self.skip_ad())
|
||||
create_task(self.mute(False, override=True))
|
||||
elif (
|
||||
self.mute_ads
|
||||
): # Seen multiple other adStates, assuming they are all ads
|
||||
self._logger.info("Ad has started, muting")
|
||||
self.logger.info("Ad has started, muting")
|
||||
create_task(self.mute(True, override=True))
|
||||
# Manages volume, useful since YouTube wants to know the volume when unmuting (even if they already have it)
|
||||
elif event_type == "onVolumeChanged":
|
||||
@@ -94,7 +94,7 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
if len(args) > 0 and (
|
||||
vid_id := args[0]["videoId"]
|
||||
): # if video id is not empty
|
||||
print(f"Getting segments for next video: {vid_id}")
|
||||
self.logger.info(f"Getting segments for next video: {vid_id}")
|
||||
create_task(self.api_helper.get_segments(vid_id))
|
||||
|
||||
# #Used to know if an ad is skippable or not
|
||||
@@ -102,18 +102,18 @@ class YtLoungeApi(pyytlounge.YtLoungeApi):
|
||||
data = args[0]
|
||||
# Gets segments for the next video (after the ad) before it starts playing
|
||||
if vid_id := data["contentVideoId"]:
|
||||
self._logger.info(f"Getting segments for next video: {vid_id}")
|
||||
self.logger.info(f"Getting segments for next video: {vid_id}")
|
||||
create_task(self.api_helper.get_segments(vid_id))
|
||||
elif (
|
||||
self.skip_ads and data["isSkipEnabled"] == "true"
|
||||
): # YouTube uses strings for booleans
|
||||
self._logger.info("Ad can be skipped, skipping")
|
||||
self.logger.info("Ad can be skipped, skipping")
|
||||
create_task(self.skip_ad())
|
||||
create_task(self.mute(False, override=True))
|
||||
elif (
|
||||
self.mute_ads
|
||||
): # Seen multiple other adStates, assuming they are all ads
|
||||
self._logger.info("Ad has started, muting")
|
||||
self.logger.info("Ad has started, muting")
|
||||
create_task(self.mute(True, override=True))
|
||||
|
||||
elif event_type == "loungeStatus":
|
||||
|
||||
Reference in New Issue
Block a user