Start of 5.X work

This commit is contained in:
Sebastien L
2025-03-18 17:38:34 -04:00
parent c0ddf0a997
commit 73bd096f37
442 changed files with 227862 additions and 21075 deletions

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from functools import partial
from typing import ClassVar, List, Any, Dict
from typing import Callable
from google.protobuf import message
from google.protobuf import message, descriptor_pb2
from google.protobuf.descriptor import Descriptor, FieldDescriptor, FileDescriptor
from google.protobuf.descriptor_pb2 import FieldDescriptorProto,DescriptorProto,EnumDescriptorProto
import google.protobuf.descriptor_pool as descriptor_pool
@@ -23,6 +23,8 @@ class ProtoElement:
package:str
file:FileDescriptor
message:str
_positions: Dict[str,tuple]
position: tuple
options:Dict[str,any]
_message_instance:ClassVar
@classmethod
@@ -31,6 +33,9 @@ class ProtoElement:
@classmethod
def set_comments_base(cls,comments:Dict[str,str]):
cls._comments = comments
@classmethod
def set_positions_base(cls,positions:Dict[str,tuple]):
cls._positions = positions
@classmethod
def set_pool(cls,pool:descriptor_pool.DescriptorPool):
cls.pool = pool
@@ -82,6 +87,8 @@ class ProtoElement:
self.render = partial(self.render_class, self)
self.comments = {comment.split('.')[-1]:self._comments[comment] for comment in self._comments.keys() if comment.startswith(self.path)}
self.position = self._positions.get(self.path)
@property
def cpp_type(self)->str:
return f'{self.package}_{self.descriptor.containing_type.name}'
@@ -117,10 +124,36 @@ class ProtoElement:
@property
def cpp_child(self):
return f'{self.cpp_type}_CHILD'
@property
def proto_file_line(self):
# Accessing file descriptor to get source code info, adjusted for proper context
if self.position:
start_line, start_column, end_line = self.position
return f"{self.file.name}:{start_line}"
else:
return f"{self.file.name}"
@property
def message_instance(self):
return getattr(self,'_message_instance',getattr(self.parent,'message_instance',None))
@property
def new_message_instance(self):
if self.type == FieldDescriptor.TYPE_MESSAGE:
try:
# Try to create a new instance using the full name of the message type
return self.prototypes[self.descriptor.message_type.full_name]()
except KeyError:
# If the above fails, use an alternative method to create a new instance
# Log the error if necessary
# self.logger.error(f'Could not find instance for {self.descriptor.full_name}')
return self.prototypes[self.descriptor.full_name]()
else:
# Return None or raise an exception if the type is not a message
return None
@property
def tree(self):
childs = '->('+', '.join(c.tree for c in self.childs ) + ')' if len(self.childs)>0 else ''
return f'{self.name}{childs}'

View File

@@ -1,4 +1,4 @@
# !/usr/bin/env python
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
from functools import partial
import sys
import json
@@ -28,11 +28,13 @@ class ProtocParser(ABC) :
response:plugin.CodeGeneratorResponse
elements:List[ProtoElement] = []
comments: Dict[str, str] = {}
positions={}
json_content = {}
main_class_list:List[str] = []
param_dict:Dict[str,str] = {}
pool:descriptor_pool.DescriptorPool
factory:message_factory
message_type_names:set = set()
@abstractmethod
def render(self,element: ProtoElement):
@@ -63,7 +65,7 @@ class ProtocParser(ABC) :
def __init__(self,data):
self.request = plugin.CodeGeneratorRequest.FromString(data)
self.response = plugin.CodeGeneratorResponse()
logger.info(f'Received ${self.get_name()} parameter(s): {self.request.parameter}')
logger.debug(f'Received ${self.get_name()} parameter(s): {self.request.parameter}')
params = self.request.parameter.split(',')
self.param_dict = {p.split('=')[0]: parse.unquote(p.split('=')[1]) for p in params if '=' in p}
if not 'const_prefix' in self.param_dict:
@@ -165,7 +167,19 @@ class ProtocParser(ABC) :
self.comments[f"{element_identifier}.trailing"] = trailing_comments
self.comments[f"{element_identifier}.detached"] = leading_detached_comments
def extract_positions(self, proto_file: FileDescriptorProto):
for location in proto_file.source_code_info.location:
# The path is a sequence of integers identifying the syntactic location
path = tuple(location.path)
# Interpret the path and map it to a specific element
element_identifier = self.interpret_path(path, proto_file)
if element_identifier is not None and not element_identifier.endswith('.'):
# Extracting span information for position
if len(location.span) >= 3: # Ensure span has at least start line, start column, and end line
start_line, start_column, end_line = location.span[:3]
# Adjusting for 1-indexing and storing the position
self.positions[element_identifier] = (start_line + 1, start_column + 1, end_line + 1)
def get_comments(self,field: FieldDescriptorProto, proto_file: FileDescriptorProto,message: DescriptorProto):
if hasattr(field,'name') :
@@ -212,7 +226,29 @@ class ProtocParser(ABC) :
return list(set([proto_file.package for proto_file in self.request.proto_file if proto_file.package]))
@property
def file_set(self)->List[FileDescriptor]:
return list(set([ self.pool.FindMessageTypeByName(message).file for message in self.main_class_list if self.pool.FindMessageTypeByName(message)]))
file_set = []
missing_messages = []
for message in self.main_class_list:
try:
message_descriptor = self.pool.FindMessageTypeByName(message)
if message_descriptor:
file_set.append(message_descriptor.file)
else:
missing_messages.append(message)
except Exception as e:
missing_messages.append(message)
if missing_messages:
sortedstring="\n".join(sorted(self.message_type_names))
logger.error(f'Error retrieving message definitions for: {", ".join(missing_messages)}. Valid messages are: \n{sortedstring}')
raise Exception(f"Invalid message(s) {missing_messages}")
# Deduplicate file descriptors
unique_file_set = list(set(file_set))
return unique_file_set
@property
def proto_files(self)->List[FileDescriptorProto]:
@@ -232,32 +268,46 @@ class ProtocParser(ABC) :
return
self.setup()
logger.info(f'Processing message(s) {", ".join([name for name in self.main_class_list ])}')
for fileObj in self.file_set :
self.start_file(fileObj)
for message in self.get_main_messages_from_file(fileObj):
element = ProtoElement( descriptor=message )
self.start_message(element)
self.process_message(element)
self.end_message(element)
self.end_file(fileObj)
sys.stdout.buffer.write(self.response.SerializeToString())
try:
for fileObj in self.file_set :
self.start_file(fileObj)
for message in self.get_main_messages_from_file(fileObj):
element = ProtoElement( descriptor=message )
self.start_message(element)
self.process_message(element)
self.end_message(element)
self.end_file(fileObj)
sys.stdout.buffer.write(self.response.SerializeToString())
except Exception as e:
# Log the error and exit gracefully
error_message = str(e)
logger.error(f'Failed to process protocol buffer files: {error_message}')
sys.stderr.write(error_message + '\n')
sys.exit(1) # Exit with a non-zero status code to indicate failure
def setup(self):
for proto_file in self.proto_files:
logger.debug(f"Extracting comments from : {proto_file.name}")
self.extract_positions(proto_file)
self.extract_comments(proto_file)
self.pool = descriptor_pool.DescriptorPool()
self.factory = message_factory.MessageFactory(self.pool)
for proto_file in self.request.proto_file:
logger.debug(f'Adding {proto_file.name} to pool')
self.pool.Add(proto_file)
# Iterate over all message types in the proto file and add them to the list
for message_type in proto_file.message_type:
# Assuming proto_file.message_type gives you message descriptors or similar
# You may need to adjust based on how proto_file is structured
self.message_type_names.add(f"{proto_file.package}.{message_type.name}")
self.messages = GetMessageClassesForFiles([f.name for f in self.request.proto_file], self.pool)
ProtoElement.set_pool(self.pool)
ProtoElement.set_render(self.render)
ProtoElement.set_logger(logger)
ProtoElement.set_comments_base(self.comments)
ProtoElement.set_positions_base(self.positions)
ProtoElement.set_prototypes(self.messages)
@property

View File

@@ -0,0 +1,48 @@
import sys
import pkg_resources
import subprocess
def check_packages(packages):
missing_packages = []
for package in packages:
try:
pkg_resources.require(package)
except pkg_resources.DistributionNotFound:
missing_packages.append(package)
return missing_packages
def install_packages(packages):
for package in packages:
try:
print(f"Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
except subprocess.CalledProcessError:
print(f"Failed to install {package}.")
return False
return True
required_packages = ["protobuf", "grpcio-tools"]
missing_packages = check_packages(required_packages)
if missing_packages:
print("Missing required Python packages:", ", ".join(missing_packages))
if install_packages(missing_packages):
missing_packages = check_packages(required_packages)
if missing_packages:
print("Still missing required Python packages after installation attempt:", ", ".join(missing_packages))
sys.exit(1)
else:
sys.exit(1)
print("All required Python packages are installed.")
# Check for the marker file path argument
if len(sys.argv) < 2:
print("Error: No marker file path provided.")
sys.exit(1)
marker_file_path = sys.argv[1]
with open(marker_file_path, "w") as marker_file:
marker_file.write("Python packages check completed successfully.")
sys.exit(0)

View File

@@ -0,0 +1,37 @@
import binascii
import os
import sys
current_dir = os.path.dirname(os.path.realpath(__file__))
protobuf_py_path = os.path.join(current_dir, '..', '..', 'build', 'protobuf', 'py')
nanopb_generator_proto_path = os.path.join(current_dir, '..', '..', 'build', 'protobuf', 'proto', 'nanopb', 'generator', 'proto')
# Adding paths to sys.path
sys.path.append(protobuf_py_path)
sys.path.append(nanopb_generator_proto_path)
import configuration_pb2
def convert_string_to_bytes(input_string):
# Replace Python-style escape sequences with actual bytes
return bytes(input_string, "utf-8").decode("unicode_escape").encode("latin1")
def main():
print(f'Utility to decode the content of an encoded string as copied from the console logs')
# Prompt the user to enter the string
input_string = input("Enter the protobuf data string: ")
# Convert the string to bytes
binary_data = convert_string_to_bytes(input_string)
# Parse the binary data
message = configuration_pb2.Config() # Assuming the message type is Configuration
message.ParseFromString(binary_data)
# Now you can access fields of the message
print(message)
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,4 @@
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import json
import os
import sys
@@ -55,6 +56,7 @@ def main():
parser.add_argument('--target_dir', help='Target directory for output files')
parser.add_argument('--include', help='Directory where message python files can be found', default=None,action = 'append' )
parser.add_argument('--json', help='Source JSON file(s)',action = 'append' )
parser.add_argument('--dumpconsole', action='store_true', help='Dump to console')
args = parser.parse_args()
@@ -63,22 +65,44 @@ def main():
proto_module = load_protobuf_module(args.proto_file, args.include)
# Determine the main message class
main_message_class = getattr(proto_module, args.main_class)
try:
main_message_class = getattr(proto_module, args.main_class)
except Exception as e:
content:list = [entry for entry in dir(proto_module) if not entry.startswith('_') ]
logging.error(f'Error getting main class: {e}. Available classes: {", ".join(content)}')
sys.exit(1) # Exit with error status
message = main_message_class()
proto_base_name = Path(args.proto_file).stem
for jsonfile in args.json:
output_file_base = os.path.join(args.target_dir, Path(jsonfile).stem+".bin")
output_file_base:str = os.path.join(args.target_dir, Path(jsonfile).stem+".bin").lower()
logging.debug(f'Converting JSON file {jsonfile} to binary format')
with open(jsonfile, 'r') as json_file:
json_data = json.load(json_file)
json_format.ParseDict(json_data, message)
binary_data = message.SerializeToString()
with open(output_file_base, 'wb') as bin_file:
bin_file.write(binary_data)
logging.info(f'Binary file written to {output_file_base}')
try:
json_data = json.load(json_file)
try:
json_format.ParseDict(json_data, message)
except json_format.ParseError as e:
logging.error(f'Parse error in JSON file {jsonfile}: {e}')
sys.exit(1) # Exit with error status
except Exception as e:
logging.error(f'Error reading JSON file {jsonfile}: {e}')
sys.exit(1) # Exit with error status
binary_data = message.SerializeToString()
with open(output_file_base, 'wb') as bin_file:
bin_file.write(binary_data)
logging.info(f'Binary file written to {output_file_base}')
if args.dumpconsole:
escaped_string = ''.join('\\x{:02x}'.format(byte) for byte in binary_data)
print(f'escaped string representation: \nconst char * bin_{Path(jsonfile).stem} = "{escaped_string}";\n')
except Exception as e:
logging.error(f'Error reading JSON file {jsonfile}: {e}')
sys.exit(1) # Exit with error status
if __name__ == '__main__':
main()

View File

@@ -1,3 +1,4 @@
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import json
import os
import sys
@@ -66,14 +67,10 @@ def main():
main_message_class = getattr(proto_module, args.main_class)
message = main_message_class()
proto_base_name = Path(args.proto_file).stem
import configuration_pb2
config = configuration_pb2.Config()
with open(args.source, 'rb') as bin_file: # Open in binary mode
data =bin_file.read()
config.ParseFromString(data)
logging.info(f'Parsed: {json.dumps(config)}')
message.ParseFromString(data)
logging.info(f'Parsed: {MessageToJson(message)}')
# for jsonfile in args.json:

View File

@@ -0,0 +1,238 @@
cmake_minimum_required(VERSION 3.16)
# Set a variable to the project root directory
# Function to find the project root directory by looking for the "tools" directory
function(find_project_root_dir start_dir project_root)
set(next_dir ${start_dir})
set(found FALSE)
while(NOT found AND NOT "${next_dir}" STREQUAL "")
message(STATUS "Checking ${next_dir} for sdkconfig")
if(EXISTS "${next_dir}/sdkconfig")
set(found TRUE)
set(${project_root} ${next_dir} PARENT_SCOPE)
else()
get_filename_component(next_dir ${next_dir} DIRECTORY)
endif()
endwhile()
if(NOT found)
message(FATAL_ERROR "Unable to find the project root directory containing 'sdkconfig'.")
endif()
endfunction()
# Call the function to find the project root directory
find_project_root_dir(${CMAKE_CURRENT_SOURCE_DIR} PROJECT_ROOT_DIR)
list(APPEND CMAKE_MODULE_PATH "${PROJECT_ROOT_DIR}/components/spotify/cspot/bell/external/nanopb/extra")
set(TOOLS_DIR "${PROJECT_ROOT_DIR}/tools" )
set(GENERATED_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/generated")
set(GENERATED_PROTOBUF_ROOT "${CMAKE_BINARY_DIR}/protobuf")
set(GENERATED_PY_DIRECTORY "${GENERATED_PROTOBUF_ROOT}/py")
set(GENERATED_JS_DIRECTORY "${GENERATED_PROTOBUF_ROOT}/js")
set(GENERATED_SPIFFS_DIRECTORY "${CMAKE_BINARY_DIR}/spiffs")
find_package(PythonInterp REQUIRED)
# Function to replace a placeholder in a list
function(replace_in_list INPUT_LIST PLACEHOLDER REPLACEMENT OUTPUT_LIST)
set(TEMP_LIST "")
foreach(ITEM ${${INPUT_LIST}})
string(REPLACE ${PLACEHOLDER} ${REPLACEMENT} ITEM ${ITEM})
list(APPEND TEMP_LIST "${ITEM}")
endforeach()
set(${OUTPUT_LIST} ${TEMP_LIST} PARENT_SCOPE)
endfunction()
function(encode_special_chars INPUT_VAR)
set(ENCODED_STRING "${${INPUT_VAR}}")
# Encoding common special characters. Start with % so that
# we don't collide with encodings if we process that char
# later.
string(REPLACE "%" "%25" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE ":" "%3A" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "-" "%2D" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "=" "%3D" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "&" "%26" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "?" "%3F" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "/" "%2F" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE " " "%20" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "!" "%21" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "@" "%40" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "#" "%23" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "$" "%24" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "^" "%5E" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "*" "%2A" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "(" "%28" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE ")" "%29" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "+" "%2B" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "{" "%7B" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "}" "%7D" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "[" "%5B" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "]" "%5D" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "|" "%7C" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "\\" "%5C" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE ";" "%3B" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "'" "%27" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "\"" "%22" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "<" "%3C" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE ">" "%3E" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "," "%2C" ENCODED_STRING "${ENCODED_STRING}")
string(REPLACE "`" "%60" ENCODED_STRING "${ENCODED_STRING}")
# Add more replacements as needed
# Set the result in the parent scope
set(${INPUT_VAR} "${ENCODED_STRING}" PARENT_SCOPE)
endfunction()
function(array_to_delimited DELIMITER VAR_NAME)
# Initialize the result variable
set(RESULT "")
set(ARR "${${VAR_NAME}}")
# Determine if ARR is a list
list(LENGTH ARR ARR_LENGTH)
if(${ARR_LENGTH} GREATER 0)
# Handle ARR as ITEMS
foreach(ARRAY_ENTRY IN ITEMS ${ARR})
set(RESULT "${RESULT}${ARRAY_ENTRY}${DELIMITER}")
endforeach()
else()
# Handle ARR as a LIST
foreach(ARRAY_ENTRY IN LISTS ${ARR})
set(RESULT "${RESULT}${ARRAY_ENTRY}${DELIMITER}")
endforeach()
endif()
# Remove the trailing delimiter
# string(REGEX REPLACE "${DELIMITER}$" "" RESULT "${RESULT}")
# encode_special_chars(RESULT)
set(${VAR_NAME}_DELIMITED "${RESULT}" PARENT_SCOPE)
endfunction()
function(print_array MSG ARR)
# Determine if ARR is a list
list(LENGTH ARR ARR_LENGTH)
# Check for the optional parameter to print each item on a new line
list(LENGTH ARGN ARG_COUNT)
if(ARG_COUNT EQUAL 1)
# Get the first (and only) item in ARGN
list(GET ARGN 0 ARGN_FIRST_ITEM)
endif()
if(ARG_COUNT EQUAL 1 AND ARGN_FIRST_ITEM STREQUAL "NEWLINE")
if(${ARR_LENGTH} GREATER 0)
message(STATUS "${MSG} [ITEMS]")
foreach(ARRAY_ENTRY IN ITEMS ${ARR})
message(STATUS " - ${ARRAY_ENTRY}")
endforeach()
else()
message(STATUS "${MSG} [LISTS]")
foreach(ARRAY_ENTRY IN LISTS ${ARR})
message(STATUS " - ${ARRAY_ENTRY}")
endforeach()
endif()
else()
# Default behavior: concatenate the message and array entries
set(OUTSTRING "")
if(${ARR_LENGTH} GREATER 0)
foreach(ARRAY_ENTRY IN ITEMS ${ARR})
set(OUTSTRING "${OUTSTRING} ${ARRAY_ENTRY}")
endforeach()
else()
foreach(ARRAY_ENTRY IN LISTS ${ARR})
set(OUTSTRING "${OUTSTRING} ${ARRAY_ENTRY}")
endforeach()
endif()
message(STATUS "${MSG}${OUTSTRING}")
endif()
endfunction()
function(configure_env)
if(CMAKE_HOST_WIN32)
set(PROTODOT_BINARY "${TOOLS_DIR}/protodot/binaries/protodot-windows-amd64.exe" PARENT_SCOPE)
set(CONFIG_FILE "${TOOLS_DIR}/protodot/config-win.json" PARENT_SCOPE)
set(PROTOC_BINARY "${TOOLS_DIR}/protobuf/win64/bin/protoc.exe" PARENT_SCOPE)
set(PROTOBUF_JS_BINARY "${TOOLS_DIR}/protobuf-javascript/win64/bin/protoc-gen-js.exe" PARENT_SCOPE)
set(PROTOBUF_INCLUDE_DIR "${TOOLS_DIR}/protobuf/win64/include/google/protobuf" PARENT_SCOPE)
set(PROTOC_PLUGIN_SUFFIX ".bat" PARENT_SCOPE)
else()
set(PROTODOT_BINARY "${TOOLS_DIR}/protodot/binaries/protodot-linux-amd64" PARENT_SCOPE)
set(CONFIG_FILE "${TOOLS_DIR}/protodot/config.json" PARENT_SCOPE)
set(PROTOC_BINARY "${TOOLS_DIR}/protobuf/linux-x86_64/bin/protoc" PARENT_SCOPE)
set(PROTOBUF_JS_BINARY "${TOOLS_DIR}/protobuf-javascript/linux-x86_64/bin/protoc-gen-js" PARENT_SCOPE)
set(PROTOBUF_INCLUDE_DIR "${TOOLS_DIR}/protobuf/linux-x86_64/include/google/protobuf" PARENT_SCOPE)
set(PROTOC_PLUGIN_SUFFIX ".py" PARENT_SCOPE)
# else()
# message(FATAL_ERROR "Unsupported operating system")
endif()
endfunction()
set_property(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}"
APPEND PROPERTY
ADDITIONAL_MAKE_CLEAN_FILES "${CMAKE_BINARY_DIR}/protobuf"
)
set(PROTO_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/proto;${NANOPB_GENERATOR_SOURCE_DIR}/proto" )
# Path to the marker file
set(MARKER_FILE "${CMAKE_BINARY_DIR}/python_requirements_met.txt")
# Check if the marker file exists
if(NOT EXISTS ${MARKER_FILE})
# Marker file doesn't exist, run the Python script
execute_process(
COMMAND ${PYTHON_EXECUTABLE} ${TOOLS_DIR}/protoc_utils/check_python_packages.py ${MARKER_FILE}
RESULT_VARIABLE PYTHON_PACKAGES_CHECK_RESULT
)
# Check the result of the Python script
if(NOT PYTHON_PACKAGES_CHECK_RESULT EQUAL 0)
message(FATAL_ERROR "Python package requirements not satisfied.")
endif()
else()
message(STATUS "Python package requirements already satisfied.")
endif()
function(copy_files SRC_FILES SRC_BASE_DIR DEST_BASE_DIR)
foreach(SRC_FILE ${SRC_FILES})
# Get the relative path of the source file
file(RELATIVE_PATH RELATIVE_SRC_FILE "${SRC_BASE_DIR}" ${SRC_FILE})
string(REPLACE "${CMAKE_BINARY_DIR}" "" TARGET_NAME ${DEST_BASE_DIR})
string(REPLACE "/" "_" TARGET_NAME ${TARGET_NAME})
string(REPLACE "\\" "_" TARGET_NAME ${TARGET_NAME})
# Compute the destination file path
set(DEST_FILE "${DEST_BASE_DIR}/${RELATIVE_SRC_FILE}")
# Create the directory structure in the destination
get_filename_component(DEST_DIR ${DEST_FILE} DIRECTORY)
file(MAKE_DIRECTORY ${DEST_DIR})
# Copy the file if different
add_custom_command(
OUTPUT ${DEST_FILE}
COMMAND ${CMAKE_COMMAND} -E copy_if_different ${SRC_FILE} ${DEST_FILE}
DEPENDS ${SRC_FILE}
COMMENT "Copying ${RELATIVE_SRC_FILE} to ${DEST_BASE_DIR}"
)
list(APPEND COPIED_FILES ${DEST_FILE})
endforeach()
# Add a custom target to trigger the copy
add_custom_target(copy_${TARGET_NAME} DEPENDS ${COPIED_FILES})
set(CUSTOM_COPY_TARGET copy_${TARGET_NAME} PARENT_SCOPE)
set_property(GLOBAL APPEND PROPERTY CUSTOM_COPY_TARGETS copy_${TARGET_NAME})
endfunction()

View File

@@ -1,4 +1,4 @@
# !/usr/bin/env python
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import os
import logging
@@ -11,12 +11,19 @@ from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto,
from google.protobuf.descriptor import FieldDescriptor, Descriptor, FileDescriptor
from ProtoElement import ProtoElement
from ProtocParser import ProtocParser
from google.protobuf.json_format import Parse
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
def is_iterable(obj):
try:
iter(obj)
return True
except TypeError:
return False
class BinDefaultsParser(ProtocParser) :
def start_message(self,message:ProtoElement) :
super().start_message(message)
@@ -25,12 +32,17 @@ class BinDefaultsParser(ProtocParser) :
self.has_error = False
default_structure = message.render()
if not default_structure:
logger.warn(f'No default values for {message.name}')
logger.warning(f'No default values for {message.name}')
return
respfile = self.response.file.add()
outfilename = f'{message.name}_defaults_pb.bin'
with open(os.path.join(self.param_dict.get('defaultspath','.'),outfilename), 'wb') as bin_file:
outfilename = f'{message.name.lower()}.bin'
output_directory = os.path.join(self.param_dict.get('defaultspath', '.'),"defaults")
output_path = os.path.join(output_directory, outfilename)
os.makedirs(output_directory, exist_ok=True)
with open(output_path, 'wb') as bin_file:
res = default_structure.SerializeToString()
bin_file.write(res)
logger.info(f'Wrote {bin_file.name}')
@@ -48,13 +60,14 @@ class BinDefaultsParser(ProtocParser) :
def get_name(self)->str:
return 'protoc_plugin_defaults'
def add_comment_if_exists(element, comment_type: str, path: str) -> dict:
comment = getattr(element, f"{comment_type}_comment", "").strip()
return {f"__{comment_type}_{path}": comment} if comment else {}
def repeated_render(self,element:ProtoElement,obj:any):
return [obj] if element.repeated else obj
def render(self,element: ProtoElement):
options = element.options['cust_field'] if 'cust_field' in element.options else None
if len(element.childs)>0:
oneof = getattr(element.descriptor,'containing_oneof',None)
if oneof:
@@ -62,66 +75,137 @@ class BinDefaultsParser(ProtocParser) :
pass
has_render = False
for child in element.childs:
rendered = child.render()
if rendered:
try:
rendered = child.render()
if rendered:
has_render = True
# try:
if child.type == FieldDescriptor.TYPE_MESSAGE:
target_field = getattr(element.message_instance, child.name)
if child.label == FieldDescriptor.LABEL_REPEATED:
# If the field is repeated, iterate over the array and add each instance
if is_iterable(rendered) and not isinstance(rendered, str):
for instance in rendered:
target_field.add().CopyFrom(instance)
else:
target_field.add().CopyFrom(rendered)
else:
# For non-repeated fields, use CopyFrom
target_field.CopyFrom(rendered)
elif child.repeated:
try:
getattr(element.message_instance,child.name).extend(rendered)
except:
getattr(element.message_instance,child.name).extend( [rendered])
else:
setattr(element.message_instance,child.name,rendered)
# except:
# logger.error(f'Unable to assign value from {child.fullname} to {element.fullname}')
element.message_instance.SetInParent()
except Exception as e:
logger.error(f'{child.proto_file_line} Rendering default values failed for {child.name} of {child.path} in file {child.file.name}: {e}')
raise e
if getattr(options, 'v_msg', None):
has_render = True
# try:
if child.repeated:
try:
getattr(element.message_instance,child.name).extend(rendered)
except:
getattr(element.message_instance,child.name).extend( [rendered])
v_msg = getattr(options, 'v_msg', None)
try:
if element.repeated:
# Create a list to hold the message instances
message_instances = []
# Parse each element of the JSON array
for json_element in json.loads(v_msg):
new_instance = element.new_message_instance
Parse(json.dumps(json_element), new_instance)
message_instances.append(new_instance)
element.message_instance.SetInParent()
return message_instances
# Copy each instance to the appropriate field in the parent message
# repeated_field = getattr(element.message_instance, child.name)
# for instance in message_instances:
# repeated_field.add().CopyFrom(instance)
else:
# If the field is not repeated, parse the JSON string directly
Parse(v_msg, element.message_instance)
element.message_instance.SetInParent()
except Exception as e:
# Handle parsing errors, e.g., log them
logger.error(f"{element.proto_file_line} Error parsing json default value {v_msg} as JSON. {e}")
raise e
elif child.type == FieldDescriptor.TYPE_MESSAGE:
getattr(element.message_instance,child.name).CopyFrom(rendered)
else:
setattr(element.message_instance,child.name,rendered)
# except:
# logger.error(f'Unable to assign value from {child.fullname} to {element.fullname}')
element.message_instance.SetInParent()
if not has_render:
return None
else:
default_value = element._default_value
options = element.options['cust_field'] if 'cust_field' in element.options else None
default_value = None
msg_options = element.options['cust_msg'] if 'cust_msg' in element.options else None
init_from_mac = getattr(options,'init_from_mac', False) or getattr(msg_options,'init_from_mac', False)
default_value = getattr(options,'default_value', None)
global_name = getattr(options,'global_name', None)
const_prefix = getattr(options,'const_prefix', self.param_dict['const_prefix'])
if init_from_mac:
default_value = f'{const_prefix}@@init_from_mac@@'
elif default_value:
if element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_STRING:
default_value = default_value
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_ENUM:
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_STRING:
default_value = default_value = getattr(options,'v_string', None)
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_ENUM:
if options is not None:
try:
default_value = element.enum_values.index(default_value)
except:
enum_value = getattr(options,'v_enum', None) or getattr(options,'v_string', None)
if enum_value is not None:
default_value = element.enum_values.index(enum_value)
except:
raise ValueError(f'Invalid default value {default_value} for {element.path}')
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_INT32, FieldDescriptor.CPPTYPE_INT64,
FieldDescriptor.CPPTYPE_UINT32, FieldDescriptor.CPPTYPE_UINT64]:
int_value = int(default_value)
# Handling integer types
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_INT32, FieldDescriptor.CPPTYPE_INT64,
FieldDescriptor.CPPTYPE_UINT32, FieldDescriptor.CPPTYPE_UINT64]:
if element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_INT32, FieldDescriptor.CPPTYPE_INT64]:
default_value = getattr(options, 'v_int32', getattr(options, 'v_int64', None))
else:
default_value = getattr(options, 'v_uint32', getattr(options, 'v_uint64', None))
if default_value is not None:
int_value= int(default_value)
if element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_UINT32, FieldDescriptor.CPPTYPE_UINT64] and int_value < 0:
raise ValueError(f"Negative value for unsigned int type trying to assign {element.path} = {default_value}")
default_value = int_value
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_DOUBLE, FieldDescriptor.CPPTYPE_FLOAT]:
# Handling float and double types
elif element.descriptor.cpp_type in [FieldDescriptor.CPPTYPE_DOUBLE, FieldDescriptor.CPPTYPE_FLOAT]:
default_value = getattr(options, 'v_double', getattr(options, 'v_float', None))
if default_value is not None:
float_value = float(default_value)
if '.' not in default_value:
if '.' not in str(default_value):
raise ValueError(f"Integer string for float/double type trying to assign {element.path} = {default_value}")
default_value = float_value
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_BOOL:
if default_value.lower() in ['true', 'false']:
default_value = default_value.lower() == 'true'
else:
raise ValueError(f'Invalid boolean value trying to assign {element.path} = {default_value}')
if default_value:
# Handling boolean type
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_BOOL:
if options is not None:
default_value = getattr(options, 'v_bool', False)
if isinstance(default_value, str):
if default_value.lower() in ['true', 'false']:
default_value = default_value.lower() == 'true'
else:
raise ValueError(f'Invalid boolean value trying to assign {element.path} = {default_value}')
# Handling bytes type
elif element.descriptor.cpp_type == FieldDescriptor.CPPTYPE_BYTES:
default_value = getattr(options, 'v_bytes', b'')
elif element.descriptor.cpp_type == FieldDescriptor.TYPE_MESSAGE:
pass
if default_value is not None:
element.message_instance.SetInParent()
return self.repeated_render(element,default_value) if default_value else None
return self.repeated_render(element, default_value)
else:
return None
return element.message_instance
if __name__ == '__main__':

View File

@@ -1,4 +1,4 @@
# !/usr/bin/env python
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import argparse
import sys
import os
@@ -25,7 +25,7 @@ logging.basicConfig(
def process(
request: plugin.CodeGeneratorRequest, response: CodeGeneratorResponse, data
) -> None:
logger.info(f'Received parameter(s): {request.parameter}')
logger.debug(f'Received parameter(s): {request.parameter}')
params = request.parameter.split(',')
param_dict = {p.split('=')[0]: parse.unquote(p.split('=')[1]) for p in params if '=' in p}
param_dict['path'] = param_dict['path'].split('?')

View File

@@ -1,4 +1,4 @@
# !/usr/bin/env python
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import os
import logging
@@ -23,7 +23,7 @@ class JsonParser(ProtocParser) :
super().end_message(message)
jsonmessage = message.render()
respfile = self.response.file.add()
respfile.name = f'{message.name}_pb2.json'
respfile.name = f'{message.fullname}_pb2.json'
logger.info(f"Creating new template json file: {respfile.name}")
respfile.content = json.dumps(jsonmessage, indent=2) + "\r\n"
@@ -73,7 +73,7 @@ class JsonParser(ProtocParser) :
if __name__ == '__main__':
data = ProtocParser.get_data()
logger.debug(f"Generating blank json file(s)")
logger.info(f"Generating blank json file(s)")
protocParser:JsonParser = JsonParser(data)
protocParser.process()
logger.debug('Done generating JSON file(s)')

View File

@@ -1,10 +1,11 @@
# !/usr/bin/env python
#!/opt/esp/python_env/idf4.4_py3.8_env/bin/python
import io
import os
import logging
import json
from pathlib import Path
import sys
from typing import Dict, List
from google.protobuf.compiler import plugin_pb2 as plugin
from google.protobuf.descriptor_pb2 import FileDescriptorProto, DescriptorProto, FieldDescriptorProto,FieldOptions
@@ -59,6 +60,7 @@ class OptionsParser(ProtocParser) :
def render_all_members(self):
for key in [key for key in self.all_members_defaults.keys() if not '.' in key and not key.startswith(PROCESSED_PREFIX)]:
# make each line unique
self.all_members_defaults[key] = set(self.all_members_defaults[key])
@@ -66,9 +68,12 @@ class OptionsParser(ProtocParser) :
# WRITE DEPENDENCIES FOR THE CURRENT FILE
member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
self.c_header.writelines(member_defines)
try:
member_defines = '\n'.join([ self.all_members_defaults.get(key) for key in self.all_members_defaults.keys() if '.' in key])
self.c_header.writelines(member_defines)
except Exception as e:
logger.error(f'{e}')
sys.exit(1) # Exit with error status
message_defines = ',\\\n'.join([key for key in self.all_members_defaults.keys() if not '.' in key])
self.c_header.writelines(message_defines)
@@ -213,7 +218,7 @@ class OptionsParser(ProtocParser) :
self.get_mkey(element.cpp_type).append(f'#define {member_prefix} {opt_member_type}({init_from_mac_str}, {const_prefix_str}, {read_only_str}, {default_value_str}, {global_name_str})')
if element.detached_leading_comments:
logger.debug(f'{element.detached_leading_comments}')
logger.info(f'INITFROMMAC: {self.global_name}{element.path}')
logger.info(f'INITFROMMAC: {self.global_name}/{element.path}')
self.get_mkey(element.cpp_child).append(f'{opt_member}(msg, member.{element.cpp_member},{opt_member_type}) ')
self.get_mkey(element.cpp_root).append(f'{opt_member}({element.cpp_type},{element.cpp_member},{opt_member_type})')