em: initialize include script

Implementation of go/entity-manager-break-down-design-doc
Enable the entity-manager to include the common json
configs.

Fusion-Link:
platform5: fusion2/1836c139-9877-3e9d-884e-03f10bef4f61
platform11: fusion2/6f62cff7-f86c-30bd-a4d5-e0d459612452
platform15: fusion2/ac594284-a9d6-3368-be4c-aa83a04779ef
platform17: fusion2/03356b6d-7766-3ec4-b024-e0b34ec1aed9
Tested: unit test pass
Google-Bug-Id: 441445178
Change-Id: I20bd59c66bae67e125a91f957a618d2cf7a27db5
Signed-off-by: Alex Lai <alexlai@google.com>
diff --git a/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess.py b/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess.py
new file mode 100644
index 0000000..b33add0
--- /dev/null
+++ b/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess.py
@@ -0,0 +1,165 @@
+#!/usr/bin/python3
+
+import argparse
+import json
+import re
+import pathlib
+import sys
+from typing import Any
+import os
+
+_INCLUDE_KEYWORD = "include"
+
+
+# https://github.com/openbmc/entity-manager/blob/213b397d35168ef975a2bf5b6de7492dc00439c6/scripts/validate_configs.py#L34C1-L46C40
+def remove_c_comments(string: str) -> str:
+  """Removes C style comments from a multiline string."""
+
+  # first group captures quoted strings (double or single)
+  # second group captures comments (//single-line or /* multi-line */)
+  pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)"
+  regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
+
+  def _replacer(match):
+    if match.group(2) is not None:
+      return ""
+    else:
+      return match.group(1)
+
+  return regex.sub(_replacer, string)
+
+
+def process_includes(
+    data: dict[str, Any] | list[Any],
+    include_folder: pathlib.Path,
+    visited_paths: set[pathlib.Path] = None,
+) -> None:
+  """Processes includes in a config file."""
+  # TODO(alexlai): Include files can be cached.
+
+  if visited_paths is None:
+    visited_paths = set()
+
+  if isinstance(data, dict):
+    if _INCLUDE_KEYWORD in data and isinstance(data[_INCLUDE_KEYWORD], str):
+      include_path = include_folder / data[_INCLUDE_KEYWORD]
+
+      if include_path in visited_paths:
+        print(
+            f"Circular include detected, skipping: {include_path}",
+            file=sys.stderr,
+        )
+        data.pop(_INCLUDE_KEYWORD, None)
+        return
+
+      visited_paths.add(include_path)
+      try:
+        with open(include_path, "r") as include_file:
+          include_data = json.loads(remove_c_comments(include_file.read()))
+        del data[_INCLUDE_KEYWORD]
+        process_includes(include_data, include_folder, visited_paths)
+        if isinstance(include_data, list):
+          # dict cannot include list
+          print(f"Failed to include {include_path}: included list in a dict")
+        else:
+          data.update(include_data)
+      except FileNotFoundError as e:
+        print(
+            f"Include file not found {include_path}: {e}",
+            file=sys.stderr
+        )
+        data.pop(_INCLUDE_KEYWORD, None)
+      except json.JSONDecodeError:
+        print(f"Failed to parse include file: {include_path}", file=sys.stderr)
+        data.pop(_INCLUDE_KEYWORD, None)
+      finally:
+        visited_paths.remove(include_path)
+    for value in data.values():
+      if isinstance(value, (dict, list)):
+        process_includes(value, include_folder, visited_paths)
+
+  if isinstance(data, list):
+    new_data = []
+    for item in data:
+      if isinstance(item, dict) and _INCLUDE_KEYWORD in item and len(item) == 1:
+        include_path = include_folder / item[_INCLUDE_KEYWORD]
+
+        if include_path in visited_paths:
+          print(
+              f"Circular include detected, skipping: {include_path}",
+              file=sys.stderr,
+          )
+          continue
+
+        visited_paths.add(include_path)
+        try:
+          with open(include_path, "r") as include_file:
+            include_data = json.loads(remove_c_comments(include_file.read()))
+            if not isinstance(include_data, list):
+              print(
+                  "JSON included into an array must be an array itself:",
+                  include_path,
+                  file=sys.stderr,
+              )
+              continue
+            process_includes(include_data, include_folder, visited_paths)
+            new_data.extend(include_data)
+        except FileNotFoundError:
+          print(f"Include file not found: {include_path}", file=sys.stderr)
+        except json.JSONDecodeError as e:
+          print(
+              f"Error decoding JSON from {include_path}: {e}", file=sys.stderr
+          )
+        finally:
+          visited_paths.remove(include_path)
+      else:
+        if isinstance(item, (dict, list)):
+          process_includes(item, include_folder, visited_paths)
+        new_data.append(item)
+    data[:] = new_data
+
+
+def process_config_file(
+    input_config_filename: pathlib.Path, args: argparse.Namespace):
+  """Processes a single config file."""
+
+  with open(input_config_filename, "r") as input_config_file:
+    data = json.loads(remove_c_comments(input_config_file.read()))
+    process_includes(data, args.include_folder)
+    output_config_filename = args.output_folder / input_config_filename.name
+    with open(output_config_filename, "w") as output_config_file:
+      json.dump(data, output_config_file, indent=4)
+      os.chmod(output_config_filename, 0o644)
+
+
+def main(args: argparse.Namespace) -> None:
+  for input_config_filename in args.input_folder.glob("*.json"):
+    process_config_file(input_config_filename, args)
+
+
+def parse_args():
+  """Parses command line arguments."""
+  parser = argparse.ArgumentParser(description="Process entity manager")
+  parser.add_argument(
+      "--input_folder",
+      required=True,
+      type=pathlib.Path,
+      help="Input JSON folder",
+  )
+  parser.add_argument(
+      "--include_folder",
+      required=True,
+      type=pathlib.Path,
+      help="Include JSON folder",
+  )
+  parser.add_argument(
+      "--output_folder",
+      required=True,
+      type=pathlib.Path,
+      help="Output JSON folder",
+  )
+  return parser.parse_args()
+
+if __name__ == "__main__":
+  args = parse_args()
+  main(args)
diff --git a/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess_test.py b/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess_test.py
new file mode 100644
index 0000000..b14b21e
--- /dev/null
+++ b/recipes-phosphor/configuration/entity-manager/scripts/config_preprocess_test.py
@@ -0,0 +1,161 @@
+import unittest
+import pathlib
+import tempfile
+import json
+import io
+from unittest import mock
+import argparse
+
+import config_preprocess
+
+
+class ProcessTest(unittest.TestCase):
+
+  def test_remove_c_comments(self):
+    """Tests the remove_c_comments function."""
+    self.assertEqual(
+        config_preprocess.remove_c_comments('{"key": "value" // comment\n}'),
+        '{"key": "value" \n}',
+    )
+    self.assertEqual(
+        config_preprocess.remove_c_comments('{"key": "value" /* comment */}'),
+        '{"key": "value" }',
+    )
+    self.assertEqual(
+        config_preprocess.remove_c_comments(
+          '{"key": "value" /* multi\nline\ncomment */}'
+        ),
+        '{"key": "value" }',
+    )
+    self.assertEqual(
+        config_preprocess.remove_c_comments('{"key": "http://google.com"}'),
+        '{"key": "http://google.com"}',
+    )
+    self.assertEqual(config_preprocess.remove_c_comments("no comments"), "no comments")
+
+  def test_process_includes_dict(self):
+    """Tests process_includes with dictionary includes."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      include_file = include_folder / "include.json"
+      with open(include_file, "w") as f:
+        json.dump({"included_key": "included_value"}, f)
+
+      data = {"key": "value", "include": "include.json"}
+      config_preprocess.process_includes(data, include_folder)
+      self.assertEqual(data, {"key": "value", "included_key": "included_value"})
+
+  def test_process_includes_list(self):
+    """Tests process_includes with list includes."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      include_file = include_folder / "include.json"
+      with open(include_file, "w") as f:
+        json.dump([{"item": 2}, {"item": 3}], f)
+
+      data = [{"item": 1}, {"include": "include.json"}, {"item": 4}]
+      config_preprocess.process_includes(data, include_folder)
+      self.assertEqual(
+          data, [{"item": 1}, {"item": 2}, {"item": 3}, {"item": 4}]
+      )
+
+  def test_process_includes_dict_includes_list(self):
+    """This should not be included"""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      include_file = include_folder / "include.json"
+      with open(include_file, "w") as f:
+        json.dump([{"item": 1, "item": 2}], f)
+
+      data = {"key": "value", "include": "include.json"}
+      config_preprocess.process_includes(data, include_folder)
+
+      # expect that we skip this
+      self.assertEqual(
+          data, {"key": "value"}
+      )
+
+  def test_process_includes_circular_dependency(self):
+    """Tests detection of circular dependencies in includes."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      file_a = include_folder / "a.json"
+      file_b = include_folder / "b.json"
+      with open(file_a, "w") as f:
+        json.dump({"include": "b.json", "from": "a"}, f)
+      with open(file_b, "w") as f:
+        json.dump({"include": "a.json", "from": "b"}, f)
+
+      data = {"include": "a.json"}
+      with mock.patch("sys.stderr", new_callable=io.StringIO) as mock_stderr:
+        config_preprocess.process_includes(data, include_folder)
+        self.assertIn("Circular include detected", mock_stderr.getvalue())
+      self.assertEqual(data, {"from": "b"})
+
+
+  def test_process_includes_chained_dependency(self):
+    """Tests detection of chained dependencies in includes."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      file_a = include_folder / "a.json"
+      file_b = include_folder / "b.json"
+      file_c = include_folder / "c.json"
+      with open(file_a, "w") as f:
+        json.dump({"include": "b.json", "a": True}, f)
+      with open(file_b, "w") as f:
+        json.dump({"include": "c.json", "b": True}, f)
+      with open(file_c, "w") as f:
+        json.dump({"included": True, "c": True}, f)
+
+      data = {"include": "a.json"}
+      config_preprocess.process_includes(data, include_folder)
+      self.assertEqual(
+          data, {"a": True, "b": True, "c": True, "included": True}
+      )
+
+  def test_process_includes_file_not_found(self):
+    """Tests process_includes with a non-existent include file."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      include_folder = pathlib.Path(tmpdir)
+      data = {"include": "nonexistent.json"}
+      with mock.patch("sys.stderr", new_callable=io.StringIO) as mock_stderr:
+        config_preprocess.process_includes(data, include_folder)
+        self.assertIn("Include file not found", mock_stderr.getvalue())
+      self.assertEqual(data, {})
+
+  def test_process_config_file(self):
+    """Tests the process_config_file function."""
+    with tempfile.TemporaryDirectory() as tmpdir:
+      tmpdir_path = pathlib.Path(tmpdir)
+      input_folder = tmpdir_path / "input"
+      include_folder = tmpdir_path / "include"
+      output_folder = tmpdir_path / "output"
+      input_folder.mkdir()
+      include_folder.mkdir()
+      output_folder.mkdir()
+
+      # Create include file
+      include_file = include_folder / "include.json"
+      with open(include_file, "w") as f:
+        json.dump({"included": True}, f)
+
+      # Create input file
+      input_file = input_folder / "config.json"
+      with open(input_file, "w") as f:
+        json.dump({"a": 1, "include": "include.json"}, f)
+
+      args = argparse.Namespace(
+          include_folder=include_folder, output_folder=output_folder
+      )
+
+      config_preprocess.process_config_file(input_file, args)
+
+      output_file = output_folder / "config.json"
+      self.assertTrue(output_file.exists())
+      with open(output_file, "r") as f:
+        output_data = json.load(f)
+      self.assertEqual(output_data, {"a": 1, "included": True})
+
+
+if __name__ == "__main__":
+  unittest.main()
diff --git a/recipes-phosphor/configuration/entity-manager_%.bbappend b/recipes-phosphor/configuration/entity-manager_%.bbappend
index 53becc5..75f4557 100644
--- a/recipes-phosphor/configuration/entity-manager_%.bbappend
+++ b/recipes-phosphor/configuration/entity-manager_%.bbappend
@@ -15,6 +15,8 @@
     file://0001-Strip-extra-spaces-from-fru.patch \
     file://0001-Add-PropagateElementTo-Property-in-EM-Configs.patch \
     file://0008-Add-excludedProps-to-filter-out-tlbmc-information.patch \
+    file://scripts/config_preprocess.py \
+    file://scripts/config_preprocess_test.py \
 "
 
 USB_DEVICE_OPTION:gbmc := "-Dusb-device=true, -Dusb-device=false, stdplus"
@@ -22,3 +24,13 @@
 
 SYSTEMD_SERVICE:${PN}:append:gbmc = " ${@bb.utils.contains('PACKAGECONFIG', 'usb-device', 'xyz.openbmc_project.UsbDevice.service', '', d)}"
 
+# do the test
+do_check:append:gbmc() {
+    python3 ${WORKDIR}/scripts/config_preprocess_test.py
+}
+
+# parsing and installing configs
+do_install:append:gbmc() {
+    CONFIG_DIR="${D}${datadir}/${PN}/configurations"
+    python3 ${WORKDIR}/scripts/config_preprocess.py --input_folder=${WORKDIR} --include_folder=${WORKDIR}/include --output_folder=${CONFIG_DIR}
+}