mirror of
https://github.com/sle118/squeezelite-esp32.git
synced 2025-12-13 15:07:01 +03:00
initial refactoring
This commit is contained in:
185
tools/protoc_utils/ProtoElement.py
Normal file
185
tools/protoc_utils/ProtoElement.py
Normal file
@@ -0,0 +1,185 @@
|
||||
from __future__ import annotations
|
||||
from functools import partial
|
||||
from typing import ClassVar, List, Any, Dict
|
||||
from typing import Callable
|
||||
from google.protobuf import message
|
||||
from google.protobuf.descriptor import Descriptor, FieldDescriptor, FileDescriptor
|
||||
from google.protobuf.descriptor_pb2 import FieldDescriptorProto,DescriptorProto,EnumDescriptorProto
|
||||
import google.protobuf.descriptor_pool as descriptor_pool
|
||||
import logging
|
||||
import copy
|
||||
# import custom_options_pb2 as custom
|
||||
|
||||
RendererType = Callable[['ProtoElement'], Dict]
|
||||
class ProtoElement:
|
||||
childs:List[ProtoElement]
|
||||
descriptor:Descriptor|FieldDescriptor
|
||||
comments: Dict[str,str]
|
||||
enum_type:EnumDescriptorProto
|
||||
_comments: Dict[str,str] ={}
|
||||
pool:descriptor_pool.DescriptorPool
|
||||
prototypes: dict[str, type[message.Message]]
|
||||
renderer:RendererType
|
||||
package:str
|
||||
file:FileDescriptor
|
||||
message:str
|
||||
options:Dict[str,any]
|
||||
_message_instance:ClassVar
|
||||
@classmethod
|
||||
def set_prototypes(cls,prototypes:dict[str, type[message.Message]]):
|
||||
cls.prototypes = prototypes
|
||||
@classmethod
|
||||
def set_comments_base(cls,comments:Dict[str,str]):
|
||||
cls._comments = comments
|
||||
@classmethod
|
||||
def set_pool(cls,pool:descriptor_pool.DescriptorPool):
|
||||
cls.pool = pool
|
||||
@classmethod
|
||||
def set_logger(cls,logger = None):
|
||||
if not logger and not cls.logger:
|
||||
cls.logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
elif logger:
|
||||
cls.logger = logger
|
||||
@classmethod
|
||||
def set_render(cls,render):
|
||||
cls.render_class = render
|
||||
def __init__(self, descriptor: Descriptor|FieldDescriptor, parent=None):
|
||||
ProtoElement.set_logger()
|
||||
self.descriptor = descriptor
|
||||
self.file = descriptor.file
|
||||
self.package = getattr(descriptor,"file",parent).package
|
||||
self.descriptorname = descriptor.name
|
||||
self.json_name = getattr(descriptor,'json_name','')
|
||||
self.type_name = getattr(descriptor,'type_name',descriptor.name)
|
||||
|
||||
self.parent = parent
|
||||
self.fullname = descriptor.full_name
|
||||
self.type = getattr(descriptor,'type',FieldDescriptor.TYPE_MESSAGE)
|
||||
|
||||
if self.type ==FieldDescriptor.TYPE_MESSAGE:
|
||||
try:
|
||||
self._message_instance = self.prototypes[self.descriptor.message_type.full_name]()
|
||||
# self.logger.debug(f'Found instance for {self.descriptor.message_type.full_name}')
|
||||
except:
|
||||
# self.logger.error(f'Could not find instance for {self.descriptor.full_name}')
|
||||
self._message_instance = self.prototypes[self.descriptor.full_name]()
|
||||
self.label = getattr(descriptor,'label',None)
|
||||
self.childs = []
|
||||
if descriptor.has_options:
|
||||
self.options = {descr.name: value for descr, value in descriptor.GetOptions().ListFields()}
|
||||
else:
|
||||
self.options = {}
|
||||
try:
|
||||
if descriptor.containing_type.has_options:
|
||||
self.options.update({descr.name: value for descr, value in descriptor.containing_type.GetOptions().ListFields()})
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
self.render = partial(self.render_class, self)
|
||||
self.comments = {comment.split('.')[-1]:self._comments[comment] for comment in self._comments.keys() if comment.startswith(self.path)}
|
||||
@property
|
||||
def cpp_type(self)->str:
|
||||
return f'{self.package}_{self.descriptor.containing_type.name}'
|
||||
@property
|
||||
def cpp_member(self)->str:
|
||||
return self.name
|
||||
@property
|
||||
def cpp_type_member_prefix(self)->str:
|
||||
return f'{self.cpp_type}_{self.cpp_member}'
|
||||
@property
|
||||
def cpp_type_member(self)->str:
|
||||
return f'{self.cpp_type}.{self.cpp_member}'
|
||||
@property
|
||||
def main_message(self)->bool:
|
||||
return self.parent == None
|
||||
@property
|
||||
def parent(self)->ProtoElement:
|
||||
return self._parent
|
||||
@parent.setter
|
||||
def parent(self,value:ProtoElement):
|
||||
self._parent = value
|
||||
if value:
|
||||
self._parent.childs.append(self)
|
||||
@property
|
||||
def root(self)->ProtoElement:
|
||||
return self if not self.parent else self.parent
|
||||
@property
|
||||
def enum_type(self)->EnumDescriptorProto:
|
||||
return self.descriptor.enum_type
|
||||
@property
|
||||
def cpp_root(self):
|
||||
return f'{self.cpp_type}_ROOT'
|
||||
@property
|
||||
def cpp_child(self):
|
||||
return f'{self.cpp_type}_CHILD'
|
||||
@property
|
||||
def message_instance(self):
|
||||
return getattr(self,'_message_instance',getattr(self.parent,'message_instance',None))
|
||||
@property
|
||||
def tree(self):
|
||||
childs = '->('+', '.join(c.tree for c in self.childs ) + ')' if len(self.childs)>0 else ''
|
||||
return f'{self.name}{childs}'
|
||||
@property
|
||||
def name(self):
|
||||
return self.descriptorname if len(self.descriptorname)>0 else self.parent.name if self.parent else self.package
|
||||
@property
|
||||
def enum_values(self)->List[str]:
|
||||
return [n.name for n in getattr(self.enum_type,"values",getattr(self.enum_type,"value",[])) ]
|
||||
@property
|
||||
def enum_values_str(self)->str:
|
||||
return ', '.join(self.enum_values)
|
||||
@property
|
||||
def fields(self)->List[FieldDescriptor]:
|
||||
return getattr(self.descriptor,"fields",getattr(getattr(self.descriptor,"message_type",None),"fields",None))
|
||||
@property
|
||||
def _default_value(self):
|
||||
if 'default_value' in self.options:
|
||||
return self.options['default_value']
|
||||
if self.type in [FieldDescriptorProto.TYPE_INT32, FieldDescriptorProto.TYPE_INT64,
|
||||
FieldDescriptorProto.TYPE_UINT32, FieldDescriptorProto.TYPE_UINT64,
|
||||
FieldDescriptorProto.TYPE_SINT32, FieldDescriptorProto.TYPE_SINT64,
|
||||
FieldDescriptorProto.TYPE_FIXED32, FieldDescriptorProto.TYPE_FIXED64,
|
||||
FieldDescriptorProto.TYPE_SFIXED32, FieldDescriptorProto.TYPE_SFIXED64]:
|
||||
return 0
|
||||
elif self.type in [FieldDescriptorProto.TYPE_FLOAT, FieldDescriptorProto.TYPE_DOUBLE]:
|
||||
return 0.0
|
||||
elif self.type == FieldDescriptorProto.TYPE_BOOL:
|
||||
return False
|
||||
elif self.type in [FieldDescriptorProto.TYPE_STRING, FieldDescriptorProto.TYPE_BYTES]:
|
||||
return ""
|
||||
elif self.is_enum:
|
||||
return self.enum_values[0] if self.enum_values else 0
|
||||
@property
|
||||
def detached_leading_comments(self)->str:
|
||||
return self.comments["leading"] if "detached" in self.comments else ""
|
||||
|
||||
@property
|
||||
def leading_comment(self)->str:
|
||||
return self.comments["leading"] if "leading" in self.comments else ""
|
||||
@property
|
||||
def trailing_comment(self)->str:
|
||||
return self.comments["trailing"] if "trailing" in self.comments else ""
|
||||
@property
|
||||
def is_enum(self):
|
||||
return self.type == FieldDescriptorProto.TYPE_ENUM
|
||||
@property
|
||||
def path(self) -> str:
|
||||
return self.descriptor.full_name
|
||||
|
||||
@property
|
||||
def enum_name(self)-> str:
|
||||
return self.type_name.split('.', maxsplit=1)[-1]
|
||||
@property
|
||||
def repeated(self)->bool:
|
||||
return self.label== FieldDescriptor.LABEL_REPEATED
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
291
tools/protoc_utils/ProtocParser.py
Normal file
291
tools/protoc_utils/ProtocParser.py
Normal file
@@ -0,0 +1,291 @@
|
||||
# !/usr/bin/env python
|
||||
from functools import partial
|
||||
import sys
|
||||
import json
|
||||
from typing import Callable, Dict, List
|
||||
import argparse
|
||||
from abc import ABC, abstractmethod
|
||||
import google.protobuf.descriptor_pool as descriptor_pool
|
||||
|
||||
from google.protobuf import message_factory,message
|
||||
from google.protobuf.message_factory import GetMessageClassesForFiles
|
||||
from google.protobuf.compiler import plugin_pb2 as plugin
|
||||
from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
|
||||
from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
|
||||
from urllib import parse
|
||||
from ProtoElement import ProtoElement
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
|
||||
class ProtocParser(ABC) :
|
||||
|
||||
request:plugin.CodeGeneratorRequest
|
||||
response:plugin.CodeGeneratorResponse
|
||||
elements:List[ProtoElement] = []
|
||||
comments: Dict[str, str] = {}
|
||||
json_content = {}
|
||||
main_class_list:List[str] = []
|
||||
param_dict:Dict[str,str] = {}
|
||||
pool:descriptor_pool.DescriptorPool
|
||||
factory:message_factory
|
||||
|
||||
@abstractmethod
|
||||
def render(self,element: ProtoElement):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self)->str:
|
||||
pass
|
||||
@abstractmethod
|
||||
# def start_element(self,element:ProtoElement):
|
||||
# logger.debug(f'START Processing ELEMENT {element.path}')
|
||||
# @abstractmethod
|
||||
# def end_element(self,element:ProtoElement):
|
||||
# logger.debug(f'END Processing ELEMENT {element.path}')
|
||||
@abstractmethod
|
||||
def end_message(self,classElement:ProtoElement):
|
||||
logger.debug(f'END Processing MESSAGE {classElement.name}')
|
||||
@abstractmethod
|
||||
def start_message(self,classElement:ProtoElement) :
|
||||
logger.debug(f'START Processing MESSAGE {classElement.name}')
|
||||
@abstractmethod
|
||||
def start_file(self,file:FileDescriptor) :
|
||||
logger.debug(f'START Processing file {file.name}')
|
||||
@abstractmethod
|
||||
def end_file(self,file:FileDescriptor) :
|
||||
logger.debug(f'END Processing file {file.name}')
|
||||
|
||||
def __init__(self,data):
|
||||
self.request = plugin.CodeGeneratorRequest.FromString(data)
|
||||
self.response = plugin.CodeGeneratorResponse()
|
||||
logger.info(f'Received ${self.get_name()} parameter(s): {self.request.parameter}')
|
||||
params = self.request.parameter.split(',')
|
||||
self.param_dict = {p.split('=')[0]: parse.unquote(p.split('=')[1]) for p in params if '=' in p}
|
||||
if not 'const_prefix' in self.param_dict:
|
||||
self.param_dict['const_prefix'] = ""
|
||||
logger.warn("No option passed for const_prefix. No prefix will be used for option init_from_mac")
|
||||
self.main_class_list = self.get_arg(name= 'main_class',split=True,split_char='!')
|
||||
if 'path' in self.param_dict:
|
||||
self.param_dict['path'] = self.param_dict['path'].split('?')
|
||||
for p in self.param_dict['path']:
|
||||
logger.debug(f'Adding to path: {p}')
|
||||
sys.path.append(p)
|
||||
import customoptions_pb2 as custom__options__pb2
|
||||
|
||||
def get_arg(self,name:str,default=None,split:bool=False,split_char:str=';'):
|
||||
result = self.param_dict.get(name, default)
|
||||
if result and len(result) == 0:
|
||||
if not default:
|
||||
logger.error(f'Plugin parameter {name} not found')
|
||||
result = None
|
||||
else:
|
||||
result = default
|
||||
logger.warn(f'Plugin parameter {name} not found. Defaulting to {str(default)}')
|
||||
if split and result:
|
||||
result = result.split(split_char)
|
||||
logger.debug(f'Returning argument {name}={str(result)}')
|
||||
return result
|
||||
def get_name_attr(self,proto_element):
|
||||
attributes = ['package','name']
|
||||
for att in attributes:
|
||||
if hasattr(proto_element, att):
|
||||
return att
|
||||
return None
|
||||
def interpret_path(self,path, proto_element):
|
||||
if not path:
|
||||
if hasattr(proto_element,"name"):
|
||||
return proto_element.name
|
||||
else:
|
||||
return ''
|
||||
|
||||
# Get the next path element
|
||||
path_elem = path[0]
|
||||
name_att = self.get_name_attr(proto_element)
|
||||
if name_att:
|
||||
elem_name = getattr(proto_element, name_att)
|
||||
elem_sep = '.'
|
||||
else:
|
||||
elem_name = ''
|
||||
elem_sep = ''
|
||||
|
||||
# Ensure the proto_element has a DESCRIPTOR attribute
|
||||
if hasattr(proto_element, 'DESCRIPTOR'):
|
||||
# Use the DESCRIPTOR to access field information
|
||||
descriptor = proto_element.DESCRIPTOR
|
||||
|
||||
# Get the field name from the descriptor
|
||||
try:
|
||||
field = descriptor.fields_by_number[path_elem]
|
||||
except:
|
||||
return None
|
||||
|
||||
field_name = field.name
|
||||
field_name = field_name.lower().replace('_field_number', '')
|
||||
|
||||
# Access the field if it exists
|
||||
if field_name == "extension" :
|
||||
return field_name
|
||||
|
||||
elif hasattr(proto_element, field_name):
|
||||
next_element = getattr(proto_element, field_name)
|
||||
if isinstance(next_element,list):
|
||||
# If the next element is a list, use the next path element as an index
|
||||
return f'{elem_name}{elem_sep}{self.interpret_path(path[1:], next_element[path[1]])}'
|
||||
else:
|
||||
# If it's not a list, just continue with the next path element
|
||||
return f'{elem_name}{elem_sep}{self.interpret_path(path[1:], next_element)}'
|
||||
else:
|
||||
return f'{elem_name}{elem_sep}{self.interpret_path(path[1:], proto_element[path_elem])}'
|
||||
# If the path cannot be interpreted, return None or raise an error
|
||||
return None
|
||||
|
||||
|
||||
def extract_comments(self,proto_file: FileDescriptorProto):
|
||||
for location in proto_file.source_code_info.location:
|
||||
# The path is a sequence of integers identifying the syntactic location
|
||||
path = tuple(location.path)
|
||||
leading_comments = location.leading_comments.strip()
|
||||
trailing_comments = location.trailing_comments.strip()
|
||||
if len(location.leading_detached_comments)>0:
|
||||
logger.debug('found detached comments')
|
||||
|
||||
leading_detached_comments = '\r\n'.join(location.leading_detached_comments)
|
||||
if len(leading_comments) == 0 and len(trailing_comments) == 0 and len(leading_detached_comments) == 0:
|
||||
continue
|
||||
# Interpret the path and map it to a specific element
|
||||
# This is where you'll need to add logic based on your protobuf structure
|
||||
element_identifier = self.interpret_path(path, proto_file)
|
||||
if element_identifier is not None:
|
||||
self.comments[f"{element_identifier}.leading"] = leading_comments
|
||||
self.comments[f"{element_identifier}.trailing"] = trailing_comments
|
||||
self.comments[f"{element_identifier}.detached"] = leading_detached_comments
|
||||
|
||||
|
||||
|
||||
def get_comments(self,field: FieldDescriptorProto, proto_file: FileDescriptorProto,message: DescriptorProto):
|
||||
if hasattr(field,'name') :
|
||||
name = getattr(field,'name')
|
||||
commentspath = f"{proto_file.package}.{message.name}.{name}"
|
||||
if commentspath in self.comments:
|
||||
return commentspath,self.comments[commentspath]
|
||||
return None,None
|
||||
|
||||
def get_nested_message(self, field: FieldDescriptorProto, proto_file: FileDescriptorProto):
|
||||
# Handle nested message types
|
||||
if field.type != FieldDescriptorProto.TYPE_MESSAGE:
|
||||
return None
|
||||
|
||||
nested_message_name = field.type_name.split('.')[-1]
|
||||
# logger.debug(f'Looking for {field.type_name} ({nested_message_name}) in {nested_list}')
|
||||
|
||||
nested_message= next((m for m in proto_file.message_type if m.name == nested_message_name), None)
|
||||
if not nested_message:
|
||||
# logger.debug(f'Type {nested_message_name} was not found in file {proto_file.name}. Checking in processed list: {processed_list}')
|
||||
nested_message = next((m for m in self.elements if m.name == nested_message_name), None)
|
||||
if not nested_message:
|
||||
logger.error(f'Could not locate message class {field.type_name} ({nested_message_name})')
|
||||
return nested_message
|
||||
|
||||
def process_message(self,message: ProtoElement, parent:ProtoElement = None )->ProtoElement:
|
||||
if not message:
|
||||
return
|
||||
|
||||
if not message.fields:
|
||||
logger.warn(f"{message.path} doesn't have fields!")
|
||||
return
|
||||
for field in message.fields:
|
||||
element = ProtoElement(
|
||||
parent=message,
|
||||
descriptor=field
|
||||
)
|
||||
logging.debug(f'Element: {element.path}')
|
||||
if getattr(field,"message_type",None):
|
||||
self.process_message(element,message)
|
||||
|
||||
@property
|
||||
def packages(self)->List[str]:
|
||||
return list(set([proto_file.package for proto_file in self.request.proto_file if proto_file.package]))
|
||||
@property
|
||||
def file_set(self)->List[FileDescriptor]:
|
||||
return list(set([ self.pool.FindMessageTypeByName(message).file for message in self.main_class_list if self.pool.FindMessageTypeByName(message)]))
|
||||
|
||||
@property
|
||||
def proto_files(self)->List[FileDescriptorProto]:
|
||||
return list(
|
||||
proto_file for proto_file in self.request.proto_file if
|
||||
not proto_file.name.startswith("google/")
|
||||
and not proto_file.name.startswith("nanopb")
|
||||
and not proto_file.package.startswith("google.protobuf")
|
||||
)
|
||||
|
||||
|
||||
def get_main_messages_from_file(self,fileDescriptor:FileDescriptor)->List[Descriptor]:
|
||||
return [message for name,message in fileDescriptor.message_types_by_name.items() if message.full_name in self.main_class_list]
|
||||
def process(self) -> None:
|
||||
if len(self.proto_files) == 0:
|
||||
logger.error('No protocol buffer file selected for processing')
|
||||
return
|
||||
self.setup()
|
||||
logger.info(f'Processing message(s) {", ".join([name for name in self.main_class_list ])}')
|
||||
|
||||
for fileObj in self.file_set :
|
||||
self.start_file(fileObj)
|
||||
for message in self.get_main_messages_from_file(fileObj):
|
||||
element = ProtoElement( descriptor=message )
|
||||
self.start_message(element)
|
||||
self.process_message(element)
|
||||
self.end_message(element)
|
||||
self.end_file(fileObj)
|
||||
sys.stdout.buffer.write(self.response.SerializeToString())
|
||||
|
||||
def setup(self):
|
||||
|
||||
for proto_file in self.proto_files:
|
||||
logger.debug(f"Extracting comments from : {proto_file.name}")
|
||||
self.extract_comments(proto_file)
|
||||
self.pool = descriptor_pool.DescriptorPool()
|
||||
self.factory = message_factory.MessageFactory(self.pool)
|
||||
for proto_file in self.request.proto_file:
|
||||
logger.debug(f'Adding {proto_file.name} to pool')
|
||||
self.pool.Add(proto_file)
|
||||
self.messages = GetMessageClassesForFiles([f.name for f in self.request.proto_file], self.pool)
|
||||
ProtoElement.set_pool(self.pool)
|
||||
ProtoElement.set_render(self.render)
|
||||
ProtoElement.set_logger(logger)
|
||||
ProtoElement.set_comments_base(self.comments)
|
||||
ProtoElement.set_prototypes(self.messages)
|
||||
|
||||
@property
|
||||
def main_messages(self)->List[ProtoElement]:
|
||||
return [ele for ele in self.elements if ele.main_message ]
|
||||
|
||||
def get_message_descriptor(self, name) -> Descriptor:
|
||||
for package in self.packages:
|
||||
qualified_name = f'{package}.{name}' if package else name
|
||||
|
||||
try:
|
||||
descriptor = self.pool.FindMessageTypeByName(qualified_name)
|
||||
if descriptor:
|
||||
return descriptor
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_data(cls):
|
||||
parser = argparse.ArgumentParser(description='Process protobuf and JSON files.')
|
||||
parser.add_argument('--source', help='Python source file', default=None)
|
||||
args = parser.parse_args()
|
||||
if args.source:
|
||||
logger.info(f'Loading request data from {args.source}')
|
||||
with open(args.source, 'rb') as file:
|
||||
data = file.read()
|
||||
else:
|
||||
data = sys.stdin.buffer.read()
|
||||
return data
|
||||
|
||||
BIN
tools/protoc_utils/__pycache__/ProtoElement.cpython-38.pyc
Normal file
BIN
tools/protoc_utils/__pycache__/ProtoElement.cpython-38.pyc
Normal file
Binary file not shown.
BIN
tools/protoc_utils/__pycache__/ProtocParser.cpython-38.pyc
Normal file
BIN
tools/protoc_utils/__pycache__/ProtocParser.cpython-38.pyc
Normal file
Binary file not shown.
84
tools/protoc_utils/generate_bin.py
Normal file
84
tools/protoc_utils/generate_bin.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import importlib.util
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from google.protobuf import json_format
|
||||
from google.protobuf.json_format import MessageToJson
|
||||
|
||||
# Assuming this script is in the same directory as the generated Python files
|
||||
# script_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'generated/src')
|
||||
# sys.path.append(script_dir)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
||||
def load_protobuf_module(source, includes):
|
||||
"""Dynamically load a protobuf module given its file name."""
|
||||
for include in includes:
|
||||
sys.path.append(include)
|
||||
|
||||
module_name = Path(source).stem
|
||||
module_file = module_name + '_pb2.py'
|
||||
module_location = Path(source).parent / module_file
|
||||
|
||||
logging.debug(f'Loading module {module_file} from {module_location} with includes [{", ".join(includes)}]')
|
||||
|
||||
spec = importlib.util.spec_from_file_location(name=module_name, location=str(module_location))
|
||||
if spec is not None:
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
logging.debug(f'Loaded protobuf module: {module_name}')
|
||||
return module
|
||||
|
||||
else:
|
||||
logging.error(f'Failed to load module {module_file} from {module_location}')
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def protobuf_to_dict(message):
|
||||
"""Convert a protobuf message to a dictionary."""
|
||||
# Convert the protobuf message to a JSON string
|
||||
json_str = MessageToJson(message, including_default_value_fields=True)
|
||||
# Parse the JSON string back into a dictionary
|
||||
return json.loads(json_str)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Process protobuf and JSON files.')
|
||||
parser.add_argument('--proto_file', help='Name of the protobuf file (without extension)')
|
||||
parser.add_argument('--main_class', help='Main message class to process')
|
||||
parser.add_argument('--target_dir', help='Target directory for output files')
|
||||
parser.add_argument('--include', help='Directory where message python files can be found', default=None,action = 'append' )
|
||||
parser.add_argument('--json', help='Source JSON file(s)',action = 'append' )
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load the protobuf module
|
||||
logging.debug(f'Loading modules')
|
||||
proto_module = load_protobuf_module(args.proto_file, args.include)
|
||||
|
||||
# Determine the main message class
|
||||
main_message_class = getattr(proto_module, args.main_class)
|
||||
message = main_message_class()
|
||||
|
||||
proto_base_name = Path(args.proto_file).stem
|
||||
|
||||
for jsonfile in args.json:
|
||||
output_file_base = os.path.join(args.target_dir, Path(jsonfile).stem+".bin")
|
||||
logging.debug(f'Converting JSON file {jsonfile} to binary format')
|
||||
with open(jsonfile, 'r') as json_file:
|
||||
json_data = json.load(json_file)
|
||||
json_format.ParseDict(json_data, message)
|
||||
binary_data = message.SerializeToString()
|
||||
with open(output_file_base, 'wb') as bin_file:
|
||||
bin_file.write(binary_data)
|
||||
logging.info(f'Binary file written to {output_file_base}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
92
tools/protoc_utils/parse_bin.py
Normal file
92
tools/protoc_utils/parse_bin.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import importlib.util
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from google.protobuf import json_format
|
||||
from google.protobuf.json_format import MessageToJson
|
||||
|
||||
|
||||
# Assuming this script is in the same directory as the generated Python files
|
||||
# script_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'generated/src')
|
||||
# sys.path.append(script_dir)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
||||
def load_protobuf_module(source, includes):
|
||||
"""Dynamically load a protobuf module given its file name."""
|
||||
for include in includes:
|
||||
sys.path.append(include)
|
||||
|
||||
module_name = Path(source).stem
|
||||
module_file = module_name + '_pb2.py'
|
||||
module_location = Path(source).parent / module_file
|
||||
|
||||
logging.info(f'Loading module {module_file} from {module_location} with includes [{", ".join(includes)}]')
|
||||
|
||||
spec = importlib.util.spec_from_file_location(name=module_name, location=str(module_location))
|
||||
if spec is not None:
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
logging.info(f'Loaded protobuf module: {module_name}')
|
||||
return module
|
||||
|
||||
else:
|
||||
logging.error(f'Failed to load module {module_file} from {module_location}')
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def protobuf_to_dict(message):
|
||||
"""Convert a protobuf message to a dictionary."""
|
||||
# Convert the protobuf message to a JSON string
|
||||
json_str = MessageToJson(message, including_default_value_fields=True)
|
||||
# Parse the JSON string back into a dictionary
|
||||
return json.loads(json_str)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Process protobuf and JSON files.')
|
||||
parser.add_argument('--proto_file', help='Name of the protobuf file (without extension)')
|
||||
parser.add_argument('--main_class', help='Main message class to process')
|
||||
parser.add_argument('--source', help='Source file to parse')
|
||||
parser.add_argument('--include', help='Directory where message python files can be found', default=None,action = 'append' )
|
||||
parser.add_argument('--json', help='Source JSON file(s)',action = 'append' )
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load the protobuf module
|
||||
logging.info(f'Loading modules')
|
||||
proto_module = load_protobuf_module(args.proto_file, args.include)
|
||||
|
||||
# Determine the main message class
|
||||
main_message_class = getattr(proto_module, args.main_class)
|
||||
message = main_message_class()
|
||||
|
||||
proto_base_name = Path(args.proto_file).stem
|
||||
import configuration_pb2
|
||||
config = configuration_pb2.Config()
|
||||
|
||||
with open(args.source, 'rb') as bin_file: # Open in binary mode
|
||||
data =bin_file.read()
|
||||
config.ParseFromString(data)
|
||||
logging.info(f'Parsed: {json.dumps(config)}')
|
||||
|
||||
|
||||
# for jsonfile in args.json:
|
||||
# output_file_base = os.path.join(args.target_dir, Path(jsonfile).stem+".bin")
|
||||
# logging.info(f'Converting JSON file {jsonfile} to binary format')
|
||||
# with open(jsonfile, 'r') as json_file:
|
||||
# json_data = json.load(json_file)
|
||||
# json_format.ParseDict(json_data, message)
|
||||
# binary_data = message.SerializeToString()
|
||||
# with open(output_file_base, 'wb') as bin_file:
|
||||
# bin_file.write(binary_data)
|
||||
# logging.info(f'Binary file written to {output_file_base}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
0
tools/protoc_utils/protoc-commands-utils.py
Normal file
0
tools/protoc_utils/protoc-commands-utils.py
Normal file
2
tools/protoc_utils/protoc-gen-defaults.bat
Normal file
2
tools/protoc_utils/protoc-gen-defaults.bat
Normal file
@@ -0,0 +1,2 @@
|
||||
@echo off
|
||||
python.exe "%~dp0protoc-gen-defaults.py" %*
|
||||
132
tools/protoc_utils/protoc-gen-defaults.py
Normal file
132
tools/protoc_utils/protoc-gen-defaults.py
Normal file
@@ -0,0 +1,132 @@
|
||||
# !/usr/bin/env python
|
||||
import os
|
||||
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from google.protobuf.compiler import plugin_pb2 as plugin
|
||||
from google.protobuf.message_factory import GetMessageClass
|
||||
from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
|
||||
from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
|
||||
from ProtoElement import ProtoElement
|
||||
from ProtocParser import ProtocParser
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
class BinDefaultsParser(ProtocParser) :
|
||||
def start_message(self,message:ProtoElement) :
|
||||
super().start_message(message)
|
||||
def end_message(self,message:ProtoElement):
|
||||
super().end_message(message)
|
||||
self.has_error = False
|
||||
default_structure = message.render()
|
||||
if not default_structure:
|
||||
logger.warn(f'No default values for {message.name}')
|
||||
return
|
||||
respfile = self.response.file.add()
|
||||
|
||||
outfilename = f'{message.name}_defaults_pb.bin'
|
||||
with open(os.path.join(self.param_dict.get('defaultspath','.'),outfilename), 'wb') as bin_file:
|
||||
res = default_structure.SerializeToString()
|
||||
bin_file.write(res)
|
||||
logger.info(f'Wrote {bin_file.name}')
|
||||
respfile.name = f'{outfilename}.gen'
|
||||
logger.info(f"Creating binary file for defaults: {respfile.name}")
|
||||
respfile.content = f'Content written to {respfile.name}'
|
||||
|
||||
def start_file(self,file:FileDescriptor) :
|
||||
super().start_file(file)
|
||||
|
||||
|
||||
def end_file(self,file:ProtoElement) :
|
||||
super().end_file(file)
|
||||
|
||||
|
||||
def get_name(self)->str:
|
||||
return 'protoc_plugin_defaults'
|
||||
|
||||
def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
|
||||
comment = getattr(element, f"{comment_type}_comment", "").strip()
|
||||
return {f"__{comment_type}_{path}": comment} if comment else {}
|
||||
def repeated_render(self,element:ProtoElement,obj:any):
|
||||
return [obj] if element.repeated else obj
|
||||
def render(self,element: ProtoElement):
|
||||
if len(element.childs)>0:
|
||||
oneof = getattr(element.descriptor,'containing_oneof',None)
|
||||
if oneof:
|
||||
# we probably shouldn't set default values here
|
||||
pass
|
||||
has_render = False
|
||||
for child in element.childs:
|
||||
rendered = child.render()
|
||||
if rendered:
|
||||
has_render = True
|
||||
# try:
|
||||
if child.repeated:
|
||||
try:
|
||||
getattr(element.message_instance,child.name).extend(rendered)
|
||||
except:
|
||||
getattr(element.message_instance,child.name).extend( [rendered])
|
||||
|
||||
elif child.type == FieldDescriptor.TYPE_MESSAGE:
|
||||
getattr(element.message_instance,child.name).CopyFrom(rendered)
|
||||
else:
|
||||
setattr(element.message_instance,child.name,rendered)
|
||||
# except:
|
||||
# logger.error(f'Unable to assign value from {child.fullname} to {element.fullname}')
|
||||
element.message_instance.SetInParent()
|
||||
if not has_render:
|
||||
return None
|
||||
|
||||
else:
|
||||
|
||||
default_value = element._default_value
|
||||
options = element.options['cust_field'] if 'cust_field' in element.options else None
|
||||
msg_options = element.options['cust_msg'] if 'cust_msg' in element.options else None
|
||||
init_from_mac = getattr(options,'init_from_mac', False) or getattr(msg_options,'init_from_mac', False)
|
||||
|
||||
|
||||
default_value = getattr(options,'default_value', None)
|
||||
global_name = getattr(options,'global_name', None)
|
||||
const_prefix = getattr(options,'const_prefix', self.param_dict['const_prefix'])
|
||||
if init_from_mac:
|
||||
default_value = f'{const_prefix}@@init_from_mac@@'
|
||||
elif default_value:
|
||||
if element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_STRING:
|
||||
default_value = default_value
|
||||
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_ENUM:
|
||||
try:
|
||||
default_value = element.enum_values.index(default_value)
|
||||
except:
|
||||
raise ValueError(f'Invalid default value {default_value} for {element.path}')
|
||||
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_INT32, FieldDescriptor.CPPTYPE_INT64,
|
||||
FieldDescriptor.CPPTYPE_UINT32, FieldDescriptor.CPPTYPE_UINT64]:
|
||||
int_value = int(default_value)
|
||||
if element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_UINT32, FieldDescriptor.CPPTYPE_UINT64] and int_value < 0:
|
||||
raise ValueError(f"Negative value for unsigned int type trying to assign {element.path} = {default_value}")
|
||||
default_value = int_value
|
||||
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_DOUBLE, FieldDescriptor.CPPTYPE_FLOAT]:
|
||||
float_value = float(default_value)
|
||||
if '.' not in default_value:
|
||||
raise ValueError(f"Integer string for float/double type trying to assign {element.path} = {default_value}")
|
||||
default_value = float_value
|
||||
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_BOOL:
|
||||
if default_value.lower() in ['true', 'false']:
|
||||
default_value = default_value.lower() == 'true'
|
||||
else:
|
||||
raise ValueError(f'Invalid boolean value trying to assign {element.path} = {default_value}')
|
||||
if default_value:
|
||||
element.message_instance.SetInParent()
|
||||
return self.repeated_render(element,default_value) if default_value else None
|
||||
return element.message_instance
|
||||
|
||||
if __name__ == '__main__':
|
||||
data = ProtocParser.get_data()
|
||||
logger.info(f"Generating binary files for defaults")
|
||||
protocParser:BinDefaultsParser = BinDefaultsParser(data)
|
||||
protocParser.process()
|
||||
logger.debug('Done generating JSON file(s)')
|
||||
2
tools/protoc_utils/protoc-gen-dump.bat
Normal file
2
tools/protoc_utils/protoc-gen-dump.bat
Normal file
@@ -0,0 +1,2 @@
|
||||
@echo off
|
||||
python.exe "%~dp0protoc-gen-dump.py" %*
|
||||
66
tools/protoc_utils/protoc-gen-dump.py
Normal file
66
tools/protoc_utils/protoc-gen-dump.py
Normal file
@@ -0,0 +1,66 @@
|
||||
# !/usr/bin/env python
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
from google.protobuf.compiler import plugin_pb2 as plugin
|
||||
from google.protobuf.compiler.plugin_pb2 import CodeGeneratorResponse
|
||||
from urllib import parse
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
# def main():
|
||||
# data = sys.stdin.buffer.read()
|
||||
# with open("C:/Users/sle11/Documents/VSCode/squeezelite-esp32/protobuf/generated/src/code_generator_request.bin", "wb") as file:
|
||||
# file.write(data)
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# main()
|
||||
|
||||
|
||||
|
||||
def process(
|
||||
request: plugin.CodeGeneratorRequest, response: CodeGeneratorResponse, data
|
||||
) -> None:
|
||||
logger.info(f'Received parameter(s): {request.parameter}')
|
||||
params = request.parameter.split(',')
|
||||
param_dict = {p.split('=')[0]: parse.unquote(p.split('=')[1]) for p in params if '=' in p}
|
||||
param_dict['path'] = param_dict['path'].split('?')
|
||||
basename = "code_generator_request.bin"
|
||||
binpath = os.path.join(param_dict.get('binpath', './'),basename)
|
||||
file:CodeGeneratorResponse.File = response.file.add()
|
||||
file.name = f"{basename}.txt"
|
||||
file.content = f'Generated binary file {binpath}'
|
||||
logger.info(f"Dumping CodeGeneratorRequest object to : {binpath}")
|
||||
|
||||
with open(binpath, "wb") as file:
|
||||
file.write(data)
|
||||
def GetData():
|
||||
|
||||
parser = argparse.ArgumentParser(description='Process protobuf and JSON files.')
|
||||
parser.add_argument('--source', help='Python source file', default=None)
|
||||
args = parser.parse_args()
|
||||
if args.source:
|
||||
logger.info(f'Loading request data from {args.source}')
|
||||
with open(args.source, 'rb') as file:
|
||||
data = file.read()
|
||||
else:
|
||||
data = sys.stdin.buffer.read()
|
||||
return data
|
||||
|
||||
def main():
|
||||
data = GetData()
|
||||
request = plugin.CodeGeneratorRequest.FromString(data)
|
||||
response = CodeGeneratorResponse()
|
||||
process(request, response,data)
|
||||
sys.stdout.buffer.write(response.SerializeToString())
|
||||
logger.info('Done dumping request')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
2
tools/protoc_utils/protoc-gen-json
Normal file
2
tools/protoc_utils/protoc-gen-json
Normal file
@@ -0,0 +1,2 @@
|
||||
#!/bin/bash
|
||||
python "$(dirname "$0")/protoc-gen-json.py" "$@"
|
||||
2
tools/protoc_utils/protoc-gen-json.bat
Normal file
2
tools/protoc_utils/protoc-gen-json.bat
Normal file
@@ -0,0 +1,2 @@
|
||||
@echo off
|
||||
python.exe "%~dp0protoc-gen-json.py" %*
|
||||
79
tools/protoc_utils/protoc-gen-json.py
Normal file
79
tools/protoc_utils/protoc-gen-json.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# !/usr/bin/env python
|
||||
import os
|
||||
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from google.protobuf.compiler import plugin_pb2 as plugin
|
||||
from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
|
||||
from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
|
||||
from ProtoElement import ProtoElement
|
||||
from ProtocParser import ProtocParser
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
class JsonParser(ProtocParser) :
|
||||
def start_message(self,message:ProtoElement) :
|
||||
super().start_message(message)
|
||||
def end_message(self,message:ProtoElement):
|
||||
super().end_message(message)
|
||||
jsonmessage = message.render()
|
||||
respfile = self.response.file.add()
|
||||
respfile.name = f'{message.name}_pb2.json'
|
||||
logger.info(f"Creating new template json file: {respfile.name}")
|
||||
respfile.content = json.dumps(jsonmessage, indent=2) + "\r\n"
|
||||
|
||||
def start_file(self,file:FileDescriptor) :
|
||||
super().start_file(file)
|
||||
self.jsonmessages = {}
|
||||
def end_file(self,file:ProtoElement) :
|
||||
super().end_file(file)
|
||||
|
||||
|
||||
def get_name(self)->str:
|
||||
return 'protoc_plugin_json'
|
||||
|
||||
def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
|
||||
comment = getattr(element, f"{comment_type}_comment", "").strip()
|
||||
return {f"__{comment_type}_{path}": comment} if comment else {}
|
||||
def repeated_render(self,element:ProtoElement,obj:any):
|
||||
return [obj] if element.repeated else obj
|
||||
def render(self,element: ProtoElement) -> Dict:
|
||||
result = {}
|
||||
if len(element.childs)>0:
|
||||
oneof = getattr(element.descriptor,'containing_oneof',None)
|
||||
if oneof:
|
||||
result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}'
|
||||
for child in element.childs:
|
||||
child_result = child.render()
|
||||
if len(child.childs) > 0:
|
||||
result[child.name] = child_result
|
||||
elif isinstance(child_result, dict):
|
||||
result.update(child_result)
|
||||
|
||||
else:
|
||||
result.update({
|
||||
**({f'__comments_{element.name}': element.leading_comment} if element.leading_comment.strip() else {}),
|
||||
**({f'__values_{element.name}': element.enum_values_str} if element.is_enum else {}),
|
||||
element.name: element._default_value,
|
||||
**({f'__comments2_{element.name}': element.trailing_comment} if element.trailing_comment.strip() else {})
|
||||
})
|
||||
|
||||
optnames = ['init_from_mac', 'const_prefix','read_only','default_value']
|
||||
if len(element.options)>0:
|
||||
logger.debug(f'{element.name} has options')
|
||||
for optname in [optname for optname in optnames if optname in element.options]:
|
||||
logger.debug(f"{element.name} [{optname} = {element.options[optname]}]")
|
||||
return self.repeated_render(element,result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
data = ProtocParser.get_data()
|
||||
logger.debug(f"Generating blank json file(s)")
|
||||
protocParser:JsonParser = JsonParser(data)
|
||||
protocParser.process()
|
||||
logger.debug('Done generating JSON file(s)')
|
||||
2
tools/protoc_utils/protoc-gen-options.bat
Normal file
2
tools/protoc_utils/protoc-gen-options.bat
Normal file
@@ -0,0 +1,2 @@
|
||||
@echo off
|
||||
python.exe "%~dp0protoc-gen-options.py" %*
|
||||
255
tools/protoc_utils/protoc-gen-options.py
Normal file
255
tools/protoc_utils/protoc-gen-options.py
Normal file
@@ -0,0 +1,255 @@
|
||||
# !/usr/bin/env python
|
||||
import io
|
||||
import os
|
||||
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from google.protobuf.compiler import plugin_pb2 as plugin
|
||||
from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
|
||||
from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
|
||||
from ProtoElement import ProtoElement
|
||||
from ProtocParser import ProtocParser
|
||||
file_suffix = "_options_pb2"
|
||||
head_comments ="""
|
||||
/* Automatically generated nanopb header */
|
||||
/* Generated by protoc-plugin-options */
|
||||
"""
|
||||
|
||||
PROCESSED_PREFIX = 'processed_'
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
class OptionsParser(ProtocParser) :
|
||||
|
||||
def start_message(self,message:ProtoElement) :
|
||||
super().start_message(message)
|
||||
def reset_data(self):
|
||||
self.all_members_defaults:Dict([str,str]) = {}
|
||||
self.lines = {}
|
||||
self.lines['init_from_mac'] = []
|
||||
# self.lines['read_only'] = {}
|
||||
# self.lines['default_value'] = {}
|
||||
# self.lines['init_from_mac']['source'] = ['void set_init_from_mac(self.message_type_name * data){']
|
||||
# self.lines['init_from_mac']['header'] = []
|
||||
# self.lines['read_only']['source'] = []
|
||||
# self.lines['read_only']['header'] = []
|
||||
# self.lines['default_value']['source'] = []
|
||||
# self.lines['default_value']['header'] = []
|
||||
def writelines(self,lines, out:io.StringIO):
|
||||
for l in lines:
|
||||
out.write(f'{l}\r\n')
|
||||
|
||||
# def render_all_members_for_message(self,name,roottype:bool=False):
|
||||
# elements= self.lines['init_from_mac']
|
||||
# for element in [element for element in elements if element.descriptor.containing_type.name == name]:
|
||||
# suffix = "ROOT" if roottype else "CHILD(msg, member)"
|
||||
# const_prefix = self.get_option(element,'const_prefix') or ""
|
||||
|
||||
# memberprefix = '' if roottype else 'member.'
|
||||
# msg = element.name if roottype else "msg"
|
||||
# self.c_header.writelines(f'MSG_INIT_FROM_MAC_DECLARATION({element.cpp_type})')
|
||||
# self.c_header.writelines(f'#define {element.cpp_type}_ALL_MEMBERS_{suffix}')
|
||||
# if(elements.index(element)>0):
|
||||
# self.c_header.writelines(f',\\\n')
|
||||
# else:
|
||||
# self.c_source.writelines(f'MSG_INIT_FROM_MAC_IMPLEMENTATION({msg_ctype});')
|
||||
|
||||
|
||||
def render_all_members(self):
|
||||
for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]:
|
||||
# make each line unique
|
||||
self.all_members_defaults[key] = set(self.all_members_defaults[key])
|
||||
self.all_members_defaults[key] = '\\\n'.join(self.all_members_defaults[key])
|
||||
|
||||
# WRITE DEPENDENCIES FOR THE CURRENT FILE
|
||||
|
||||
|
||||
member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
|
||||
self.c_header.writelines(member_defines)
|
||||
message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key])
|
||||
self.c_header.writelines(message_defines)
|
||||
|
||||
|
||||
def end_message(self,message:ProtoElement):
|
||||
super().end_message(message)
|
||||
self.message_type_name = message.path.replace('.','_')
|
||||
self.global_name = message.options.get("global_name",message.path)
|
||||
message.render()
|
||||
self.render_all_members()
|
||||
# self.c_source.writelines()
|
||||
def start_file(self,file:FileDescriptor) :
|
||||
super().start_file(file)
|
||||
self.set_source(Path(file.name).stem)
|
||||
self.reset_data()
|
||||
|
||||
def end_file(self,file:ProtoElement) :
|
||||
super().end_file(file)
|
||||
# Parse request
|
||||
# self.lines['init_from_mac']['source'] = ['}']
|
||||
# self.c_header.writelines(self.lines['init_from_mac']['source'])
|
||||
# self.c_source.writelines(self.lines['init_from_mac']['header'])
|
||||
self.set_source(None)
|
||||
|
||||
|
||||
|
||||
def get_name(self)->str:
|
||||
return 'protoc_plugin_options'
|
||||
|
||||
def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
|
||||
comment = getattr(element, f"{comment_type}_comment", "").strip()
|
||||
return {f"__{comment_type}_{path}": comment} if comment else {}
|
||||
def repeated_render(self,element:ProtoElement,obj:any):
|
||||
return [obj] if element.repeated else obj
|
||||
def get_option(self,element:ProtoElement,optname:str):
|
||||
options = element.options.get('cust_field',dict())
|
||||
msg_options = element.options.get('cust_msg',dict())
|
||||
return getattr(options,optname,None) or getattr(msg_options,optname,None)
|
||||
def get_nanoppb_option(self,element:ProtoElement,optname:str):
|
||||
options = element.options.get('nanopb_msgopt',dict())
|
||||
msg_options = element.options.get('nanopb',dict())
|
||||
return getattr(options,optname,None) or getattr(msg_options,optname,None)
|
||||
def get_mkey(self,key)->list([str]):
|
||||
if not self.all_members_defaults.get(key):
|
||||
self.all_members_defaults[key] = list()
|
||||
return self.all_members_defaults[key]
|
||||
def append_line(self,key,line):
|
||||
self.get_mkey(key).append(line)
|
||||
|
||||
def get_member_assignment(self, element: ProtoElement, value) -> str:
|
||||
member = "default_value."
|
||||
cpptype = element.descriptor.cpp_type
|
||||
|
||||
if cpptype == FieldDescriptor.CPPTYPE_ENUM:
|
||||
member += f'v_enum = {element.cpp_type_member_prefix}_{value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_INT32:
|
||||
member += f'v_int32 = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_INT64:
|
||||
member += f'v_int64 = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_UINT32:
|
||||
member += f'v_uint32 = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_UINT64:
|
||||
member += f'v_uint64 = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_DOUBLE:
|
||||
member += f'v_double = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_FLOAT:
|
||||
member += f'v_float = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_BOOL:
|
||||
member += f'v_bool = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_STRING:
|
||||
member += f'v_string = {value}'
|
||||
elif cpptype == FieldDescriptor.CPPTYPE_MESSAGE:
|
||||
# Assuming value is a serialized string or similar
|
||||
member += f'v_bytes = {value}'
|
||||
else:
|
||||
raise ValueError(f"Unsupported C++ type: {cpptype}")
|
||||
|
||||
return member
|
||||
|
||||
def render(self,element: ProtoElement) -> Dict:
|
||||
result = {}
|
||||
if len(element.childs)>0:
|
||||
# oneof = getattr(element.descriptor,'containing_oneof',None)
|
||||
# if oneof:
|
||||
# result[f'__one_of_{element.name}'] = f'Choose only one structure for {oneof.full_name}'
|
||||
for child in element.childs:
|
||||
child.render()
|
||||
|
||||
else:
|
||||
options = element.options.get('cust_field',dict())
|
||||
nanopb_msgopt = element.options.get('',dict())
|
||||
nanopb = element.options.get('nanopb',dict())
|
||||
msg_options = element.options.get('cust_msg',dict())
|
||||
init_from_mac = getattr(options,'init_from_mac',False) or getattr(msg_options,'init_from_mac',False)
|
||||
read_only = getattr(options,'read_only',False) or getattr(msg_options,'read_only',False)
|
||||
default_value = getattr(options,'default_value',None) or getattr(msg_options,'default_value',None)
|
||||
init_from_mac = self.get_option(element,'init_from_mac') or False
|
||||
const_prefix = self.get_option(element,'const_prefix') or False
|
||||
read_only = self.get_option(element,'read_only') or False
|
||||
default_value = self.get_option(element,'default_value') or False
|
||||
global_name = self.get_option(element,'global_name') or False
|
||||
|
||||
# pb_size_t which_default_value;
|
||||
# union {
|
||||
# char *v_string;
|
||||
# uint32_t v_uint32;
|
||||
# int32_t v_int32;
|
||||
# uint64_t v_uint64;
|
||||
# int64_t v_int64;
|
||||
# double v_double;
|
||||
# float v_float;
|
||||
# bool v_bool;
|
||||
# int16_t v_enum;
|
||||
# pb_bytes_array_t *v_bytes;
|
||||
# } default_value;
|
||||
|
||||
|
||||
if element.descriptor.cpp_type in [ FieldDescriptor.CPPTYPE_STRING ] and ( init_from_mac or const_prefix or read_only or default_value or global_name ):
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if not self.all_members_defaults.get(f'{PROCESSED_PREFIX}{element.cpp_type}'):
|
||||
self.get_mkey(element.cpp_child).append(f'#define {element.cpp_type}_ALL_MEMBERS_CHILD(msg,member)')
|
||||
self.get_mkey(element.cpp_root).append(f'#define {element.cpp_type}_ALL_MEMBERS_ROOT ')
|
||||
c_source, c_header = self.get_source_names(Path(element.file.name).stem)
|
||||
self.get_mkey(f'{element.cpp_type}_INCLUDES').append(f'#include "{c_header}"')
|
||||
self.all_members_defaults[f'processed_{element.cpp_type}'] = True
|
||||
|
||||
member_prefix = f'{element.cpp_type_member.replace(".","_")}_OPTIONS'
|
||||
init_from_mac_str = 'true' if init_from_mac else 'false'
|
||||
const_prefix_str = f'"{const_prefix}"' if const_prefix else '""'
|
||||
read_only_str = "true" if read_only else "false"
|
||||
default_value_str = f'"{default_value}"' if default_value else '""'
|
||||
global_name_str = f'"{global_name}"' if global_name else '""'
|
||||
opt_member = 'STRING_ARRAY' if self.get_nanoppb_option(element,'max_length') >0 else 'STRING_POINTER'
|
||||
opt_member_type = opt_member + '_MEMBER'
|
||||
|
||||
self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})')
|
||||
if element.detached_leading_comments:
|
||||
logger.debug(f'{element.detached_leading_comments}')
|
||||
logger.info(f'INITFROMMAC: {self.global_name}{element.path}')
|
||||
self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ')
|
||||
self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})')
|
||||
|
||||
|
||||
return self.repeated_render(element,result)
|
||||
|
||||
def add_file(self,name:str,stream:io.StringIO):
|
||||
if stream and not stream.closed:
|
||||
f = self.response.file.add()
|
||||
f.name = name
|
||||
f.content = stream.getvalue()
|
||||
stream.close()
|
||||
def get_source_names(self,name)->(str,str):
|
||||
csource_name = f'{name}{file_suffix}'
|
||||
return (f'{csource_name}.c',f'{csource_name}.h')
|
||||
|
||||
def set_source(self,name):
|
||||
if hasattr(self,"c_source"):
|
||||
self.add_file(self.c_source_name,self.c_source)
|
||||
self.add_file(self.c_header_name,self.c_header)
|
||||
if name:
|
||||
self.c_source_name,self.c_header_name = self.get_source_names(name)
|
||||
|
||||
|
||||
|
||||
self.c_header = io.StringIO()
|
||||
self.c_header.write(head_comments)
|
||||
self.c_header.write(f'#pragma once\n')
|
||||
self.c_header.write(f'#include "sys_options.h"\n')
|
||||
self.c_header.write(f'#include "configuration.pb.h"\n')
|
||||
self.c_source = io.StringIO()
|
||||
self.c_source.write(head_comments)
|
||||
self.c_source.write(f'#include "{self.c_header_name}"\n')
|
||||
if __name__ == '__main__':
|
||||
data = ProtocParser.get_data()
|
||||
logger.info(f"Generating c source file(s) for options")
|
||||
protocParser:OptionsParser = OptionsParser(data)
|
||||
protocParser.process()
|
||||
logger.info('Done generating c source file(s) for options')
|
||||
Reference in New Issue
Block a user