Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions codeanalyzer/__main__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import sys
from contextlib import nullcontext
from pathlib import Path
from typing import Annotated, Optional
from enum import Enum

import typer

from codeanalyzer.core import AnalyzerCore
from codeanalyzer.utils import _set_log_level, logger


class OutputFormat(str, Enum):
JSON = "json"
MSGPACK = "msgpack"


def main(
input: Annotated[
Path, typer.Option("-i", "--input", help="Path to the project root directory.")
Expand All @@ -17,6 +21,15 @@ def main(
Optional[Path],
typer.Option("-o", "--output", help="Output directory for artifacts."),
] = None,
format: Annotated[
OutputFormat,
typer.Option(
"-f",
"--format",
help="Output format: json or msgpack.",
case_sensitive=False,
),
] = OutputFormat.JSON,
analysis_level: Annotated[
int,
typer.Option("-a", "--analysis-level", help="1: symbol table, 2: call graph."),
Expand Down Expand Up @@ -58,16 +71,40 @@ def main(
input, analysis_level, using_codeql, rebuild_analysis, cache_dir, clear_cache
) as analyzer:
artifacts = analyzer.analyze()
print_stream = sys.stdout
stream_context = nullcontext(print_stream)

if output is not None:
# Handle output based on format
if output is None:
# Output to stdout (only for JSON)
if format == OutputFormat.JSON:
print(artifacts.model_dump_json(separators=(",", ":")))
else:
logger.error(
f"Format '{format.value}' requires an output directory (use -o/--output)"
)
raise typer.Exit(code=1)
else:
# Output to file
output.mkdir(parents=True, exist_ok=True)
output_file = output / "analysis.json"
stream_context = output_file.open("w")
_write_output(artifacts, output, format)


def _write_output(artifacts, output_dir: Path, format: OutputFormat):
"""Write artifacts to file in the specified format."""
if format == OutputFormat.JSON:
output_file = output_dir / "analysis.json"
with output_file.open("w") as f:
f.write(artifacts.model_dump_json(separators=(",", ":")))
logger.info(f"Analysis saved to {output_file}")

with stream_context as f:
print(artifacts.model_dump_json(indent=4), file=f)
elif format == OutputFormat.MSGPACK:
output_file = output_dir / "analysis.msgpack"
msgpack_data = artifacts.to_msgpack_bytes()
with output_file.open("wb") as f:
f.write(msgpack_data)
logger.info(f"Analysis saved to {output_file}")
logger.info(
f"Compression ratio: {artifacts.get_compression_ratio():.1%} of JSON size"
)


app = typer.Typer(
Expand Down
218 changes: 98 additions & 120 deletions codeanalyzer/schema/py_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,85 @@
import inspect
from pathlib import Path
from typing import Any, Dict, List, Optional
import gzip

from pydantic import BaseModel
from typing_extensions import Literal
import msgpack


def msgpk(cls):
"""
Decorator that adds MessagePack serialization methods to Pydantic models.

Adds methods:
- to_msgpack_bytes() -> bytes: Serialize to compact binary format
- from_msgpack_bytes(data: bytes) -> cls: Deserialize from binary format
- to_msgpack_dict() -> dict: Convert to msgpack-compatible dict
- from_msgpack_dict(data: dict) -> cls: Create instance from msgpack dict
"""

def _prepare_for_serialization(obj: Any) -> Any:
"""Convert objects to serialization-friendly format."""
if isinstance(obj, Path):
return str(obj)
elif isinstance(obj, dict):
return {
_prepare_for_serialization(k): _prepare_for_serialization(v)
for k, v in obj.items()
}
elif isinstance(obj, list):
return [_prepare_for_serialization(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(_prepare_for_serialization(item) for item in obj)
elif isinstance(obj, set):
return [_prepare_for_serialization(item) for item in obj]
elif hasattr(obj, "model_dump"): # Pydantic model
return _prepare_for_serialization(obj.model_dump())
else:
return obj

def to_msgpack_bytes(self) -> bytes:
"""Serialize the model to compact binary format using MessagePack + gzip."""
data = _prepare_for_serialization(self.model_dump())
msgpack_data = msgpack.packb(data, use_bin_type=True)
return gzip.compress(msgpack_data)

@classmethod
def from_msgpack_bytes(cls_obj, data: bytes):
"""Deserialize from MessagePack + gzip binary format."""
decompressed_data = gzip.decompress(data)
obj_dict = msgpack.unpackb(decompressed_data, raw=False)
return cls_obj.model_validate(obj_dict)

def to_msgpack_dict(self) -> dict:
"""Convert to msgpack-compatible dictionary format."""
return _prepare_for_serialization(self.model_dump())

@classmethod
def from_msgpack_dict(cls_obj, data: dict):
"""Create instance from msgpack-compatible dictionary."""
return cls_obj.model_validate(data)

def get_msgpack_size(self) -> int:
"""Get the size of the msgpack serialization in bytes."""
return len(self.to_msgpack_bytes())

def get_compression_ratio(self) -> float:
"""Get compression ratio compared to JSON."""
json_size = len(self.model_dump_json().encode("utf-8"))
msgpack_gzip_size = self.get_msgpack_size()
return msgpack_gzip_size / json_size if json_size > 0 else 1.0

# Add methods to the class
cls.to_msgpack_bytes = to_msgpack_bytes
cls.from_msgpack_bytes = from_msgpack_bytes
cls.to_msgpack_dict = to_msgpack_dict
cls.from_msgpack_dict = from_msgpack_dict
cls.get_msgpack_size = get_msgpack_size
cls.get_compression_ratio = get_compression_ratio

return cls


def builder(cls):
Expand Down Expand Up @@ -92,26 +168,9 @@ def build(self):


@builder
@msgpk
class PyImport(BaseModel):
"""Represents a Python import statement.

Attributes:
module (str): The name of the module being imported.
name (str): The name of the imported entity (e.g., function, class).
alias (Optional[str]): An optional alias for the imported entity.
start_line (int): The line number where the import statement starts.
end_line (int): The line number where the import statement ends.
start_column (int): The starting column of the import statement.
end_column (int): The ending column of the import statement.

Example:
- import numpy as np will be represented as:
PyImport(module="numpy", name="np", alias="np", start_line=1, end_line=1, start_column=0, end_column=16)
- from math import sqrt will be represented as:
PyImport(module="math", name="sqrt", alias=None, start_line=2, end_line=2, start_column=0, end_column=20
- from os.path import join as path_join will be represented as:
PyImport(module="os.path", name="path_join", alias="join", start_line=3, end_line=3, start_column=0, end_column=30)
"""
"""Represents a Python import statement."""

module: str
name: str
Expand All @@ -123,18 +182,9 @@ class PyImport(BaseModel):


@builder
@msgpk
class PyComment(BaseModel):
"""
Represents a Python comment.

Attributes:
content (str): The actual comment string (without the leading '#').
start_line (int): The line number where the comment starts.
end_line (int): The line number where the comment ends (same as start_line for single-line comments).
start_column (int): The starting column of the comment.
end_column (int): The ending column of the comment.
is_docstring (bool): Whether this comment is actually a docstring (triple-quoted string).
"""
"""Represents a Python comment."""

content: str
start_line: int = -1
Expand All @@ -145,20 +195,9 @@ class PyComment(BaseModel):


@builder
@msgpk
class PySymbol(BaseModel):
"""
Represents a symbol used or declared in Python code.

Attributes:
name (str): The name of the symbol (e.g., 'x', 'self.x', 'os.path').
scope (Literal['local', 'nonlocal', 'global', 'class', 'module']): The scope where the symbol is accessed.
kind (Literal['variable', 'parameter', 'attribute', 'function', 'class', 'module']): The kind of symbol.
type (Optional[str]): Inferred or annotated type, if available.
qualified_name (Optional[str]): Fully qualified name (e.g., 'self.x', 'os.path.join').
is_builtin (bool): Whether this is a Python builtin.
lineno (int): Line number where the symbol is accessed or declared.
col_offset (int): Column offset.
"""
"""Represents a symbol used or declared in Python code."""

name: str
scope: Literal["local", "nonlocal", "global", "class", "module"]
Expand All @@ -171,11 +210,9 @@ class PySymbol(BaseModel):


@builder
@msgpk
class PyVariableDeclaration(BaseModel):
"""Represents a Python variable declaration.

Attributes:
"""
"""Represents a Python variable declaration."""

name: str
type: Optional[str]
Expand All @@ -189,18 +226,9 @@ class PyVariableDeclaration(BaseModel):


@builder
@msgpk
class PyCallableParameter(BaseModel):
"""Represents a parameter of a Python callable (function/method).

Attributes:
name (str): The name of the parameter.
type (str): The type of the parameter.
default_value (str): The default value of the parameter, if any.
start_line (int): The line number where the parameter is defined.
end_line (int): The line number where the parameter definition ends.
start_column (int): The column number where the parameter starts.
end_column (int): The column number where the parameter ends.
"""
"""Represents a parameter of a Python callable (function/method)."""

name: str
type: Optional[str] = None
Expand All @@ -212,10 +240,9 @@ class PyCallableParameter(BaseModel):


@builder
@msgpk
class PyCallsite(BaseModel):
"""
Represents a Python call site (function or method invocation) with contextual metadata.
"""
"""Represents a Python call site (function or method invocation) with contextual metadata."""

method_name: str
receiver_expr: Optional[str] = None
Expand All @@ -231,26 +258,9 @@ class PyCallsite(BaseModel):


@builder
@msgpk
class PyCallable(BaseModel):
"""Represents a Python callable (function/method).

Attributes:
name (str): The name of the callable.
signature (str): The fully qualified name of the callable (e.g., module.function_name).
docstring (PyComment): The docstring of the callable.
decorators (List[str]): List of decorators applied to the callable.
parameters (List[PyCallableParameter]): List of parameters for the callable.
return_type (Optional[str]): The type of the return value, if specified.
code (str): The actual code of the callable.
start_line (int): The line number where the callable is defined.
end_line (int): The line number where the callable definition ends.
code_start_line (int): The line number where the code block starts.
accessed_symbols (List[str]): Symbols accessed within the callable.
call_sites (List[str]): Call sites of this callable.
is_entrypoint (bool): Whether this callable is an entry point.
local_variables (List[PyVariableDeclaration]): Local variables within the callable.
cyclomatic_complexity (int): Cyclomatic complexity of the callable.
"""
"""Represents a Python callable (function/method)."""

name: str
path: str
Expand All @@ -274,16 +284,9 @@ def __hash__(self) -> int:


@builder
@msgpk
class PyClassAttribute(BaseModel):
"""Represents a Python class attribute.

Attributes:
name (str): The name of the attribute.
type (str): The type of the attribute.
docstring (PyComment): The docstring of the attribute.
start_line (int): The line number where the attribute is defined.
end_line (int): The line number where the attribute definition ends.
"""
"""Represents a Python class attribute."""

name: str
type: Optional[str] = None
Expand All @@ -293,20 +296,9 @@ class PyClassAttribute(BaseModel):


@builder
@msgpk
class PyClass(BaseModel):
"""Represents a Python class.

Attributes:
name (str): The name of the class.
signature (str): The fully qualified name of the class (e.g., module.class_name).
docstring (PyComment): The docstring of the class.
base_classes (List[str]): List of base class names.
methods (Dict[str, PyCallable]): Mapping of method names to their callable representations.
attributes (Dict[str, PyClassAttribute]): Mapping of attribute names to their variable declarations.
inner_classes (Dict[str, "PyClass"]): Mapping of inner class names to their class representations.
start_line (int): The line number where the class definition starts.
end_line (int): The line number where the class definition ends.
"""
"""Represents a Python class."""

name: str
signature: str # e.g., module.class_name
Expand All @@ -325,18 +317,9 @@ def __hash__(self):


@builder
@msgpk
class PyModule(BaseModel):
"""Represents a Python module.

Attributes:
file_path (str): The file path of the module.
module_name (str): The name of the module (e.g., module.submodule).
imports (List[PyImport]): List of import statements in the module.
comments (List[PyComment]): List of comments in the module.
classes (Dict[str, PyClass]): Mapping of class names to their class representations.
functions (Dict[str, PyCallable]): Mapping of function names to their callable representations.
variables (List[PyVariableDeclaration]): List of variable declarations in the module.
"""
"""Represents a Python module."""

file_path: str
module_name: str
Expand All @@ -348,13 +331,8 @@ class PyModule(BaseModel):


@builder
@msgpk
class PyApplication(BaseModel):
"""Represents a Python application.

Attributes:
name (str): The name of the application.
version (str): The version of the application.
description (str): A brief description of the application.
"""
"""Represents a Python application."""

symbol_table: dict[Path, PyModule]
Loading
Loading