"""OpenAPI/Swagger specification parser for dynamic endpoint generation.
This module provides functionality to parse OpenAPI 3.0 specifications
and extract endpoint definitions for automatic API method generation.
"""
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from ABConnect.common import load_json_resource
[docs]
@dataclass
class Parameter:
"""Represents an API parameter from OpenAPI specification."""
name: str
location: str # 'path', 'query', 'header', 'cookie'
required: bool = False
schema: Dict[str, Any] = field(default_factory=dict)
description: Optional[str] = None
default: Optional[Any] = None
@property
def python_name(self) -> str:
"""Convert parameter name to valid Python identifier."""
# Handle special cases and convert to snake_case
name = self.name
# Replace common separators with underscores
name = re.sub(r'[-.\s]+', '_', name)
# Convert camelCase to snake_case
name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
return name.lower()
@property
def python_type(self) -> str:
"""Get Python type hint from OpenAPI schema."""
if not self.schema:
return 'Any'
schema_type = self.schema.get('type', 'string')
schema_format = self.schema.get('format', '')
type_mapping = {
'string': 'str',
'integer': 'int',
'number': 'float',
'boolean': 'bool',
'array': 'List[Any]',
'object': 'Dict[str, Any]'
}
# Handle special formats
if schema_type == 'string' and schema_format == 'date-time':
return 'str' # Could be datetime with proper parsing
elif schema_type == 'string' and schema_format == 'uuid':
return 'str' # Could be UUID with proper parsing
elif schema_type == 'integer' and schema_format == 'int64':
return 'int'
elif schema_type == 'number' and schema_format == 'double':
return 'float'
return type_mapping.get(schema_type, 'Any')
@dataclass
class RequestBody:
"""Represents an API request body from OpenAPI specification."""
content_types: Dict[str, Dict[str, Any]] = field(default_factory=dict)
required: bool = False
description: Optional[str] = None
@property
def primary_content_type(self) -> str:
"""Get the primary content type (prefer application/json)."""
if 'application/json' in self.content_types:
return 'application/json'
return next(iter(self.content_types.keys()), 'application/json')
@property
def schema(self) -> Optional[Dict[str, Any]]:
"""Get the schema for the primary content type."""
content = self.content_types.get(self.primary_content_type, {})
return content.get('schema')
[docs]
@dataclass
class EndpointDefinition:
"""Represents a single API endpoint definition."""
path: str
method: str
operation_id: Optional[str] = None
summary: Optional[str] = None
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
parameters: List[Parameter] = field(default_factory=list)
request_body: Optional[RequestBody] = None
responses: Dict[str, Dict[str, Any]] = field(default_factory=dict)
deprecated: bool = False
@property
def resource_name(self) -> str:
"""Extract resource name from path or tags."""
# Try to get from tags first
if self.tags:
return self.tags[0].lower().replace(' ', '_')
# Extract from path
parts = self.path.strip('/').split('/')
# Skip 'api' prefix if present
if parts and parts[0] == 'api':
parts = parts[1:]
if parts:
# Return the first non-parameter part
for part in parts:
if not part.startswith('{'):
return part
return 'unknown'
@property
def method_name(self) -> str:
"""Generate a Python method name for this endpoint."""
if self.operation_id:
# Use operation ID if available
name = self.operation_id
else:
# Generate from path and method
parts = []
path_parts = self.path.strip('/').split('/')
# Skip 'api' prefix
if path_parts and path_parts[0] == 'api':
path_parts = path_parts[1:]
# Add method prefix for non-GET methods
if self.method.lower() != 'get':
parts.append(self.method.lower())
# Add path parts (skip parameters)
for part in path_parts[1:]: # Skip resource name
if not part.startswith('{'):
parts.append(part)
name = '_'.join(parts) if parts else 'list'
# Convert to snake_case
name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
name = re.sub(r'[-.\s]+', '_', name)
return name.lower()
[docs]
def get_path_parameters(self) -> List[Parameter]:
"""Get only path parameters."""
return [p for p in self.parameters if p.location == 'path']
[docs]
def get_query_parameters(self) -> List[Parameter]:
"""Get only query parameters."""
return [p for p in self.parameters if p.location == 'query']
[docs]
class SwaggerParser:
"""Parser for OpenAPI/Swagger specifications."""
[docs]
def __init__(self, swagger_path: Optional[str] = None):
"""Initialize parser with swagger specification.
Args:
swagger_path: Path to swagger.json file. If None, uses the bundled swagger.json
"""
if swagger_path:
with open(swagger_path, 'r') as f:
self.spec = json.load(f)
else:
# Use the bundled swagger.json
self.spec = load_json_resource('swagger.json')
self.paths = self.spec.get('paths', {})
self.components = self.spec.get('components', {})
self.schemas = self.components.get('schemas', {})
[docs]
def parse(self) -> Dict[str, List[EndpointDefinition]]:
"""Parse all endpoints and group by resource.
Returns:
Dictionary mapping resource names to their endpoint definitions
"""
endpoints_by_resource = {}
for path, path_item in self.paths.items():
# Skip parameter definitions
if 'parameters' in path_item:
path_parameters = self._parse_parameters(path_item['parameters'])
else:
path_parameters = []
# Process each HTTP method
for method in ['get', 'post', 'put', 'patch', 'delete', 'head', 'options']:
if method not in path_item:
continue
operation = path_item[method]
endpoint = self._parse_endpoint(path, method, operation, path_parameters)
# Group by resource
resource = endpoint.resource_name
if resource not in endpoints_by_resource:
endpoints_by_resource[resource] = []
endpoints_by_resource[resource].append(endpoint)
return endpoints_by_resource
def _parse_endpoint(self, path: str, method: str, operation: Dict[str, Any],
path_parameters: List[Parameter]) -> EndpointDefinition:
"""Parse a single endpoint definition."""
# Parse parameters
parameters = path_parameters.copy()
if 'parameters' in operation:
parameters.extend(self._parse_parameters(operation['parameters']))
# Parse request body
request_body = None
if 'requestBody' in operation:
request_body = self._parse_request_body(operation['requestBody'])
return EndpointDefinition(
path=path,
method=method.upper(),
operation_id=operation.get('operationId'),
summary=operation.get('summary'),
description=operation.get('description'),
tags=operation.get('tags', []),
parameters=parameters,
request_body=request_body,
responses=operation.get('responses', {}),
deprecated=operation.get('deprecated', False)
)
def _parse_parameters(self, parameters: List[Dict[str, Any]]) -> List[Parameter]:
"""Parse parameter definitions."""
parsed = []
for param in parameters:
# Handle $ref
if '$ref' in param:
param = self.resolve_ref(param['$ref'])
parsed.append(Parameter(
name=param['name'],
location=param['in'],
required=param.get('required', False),
schema=param.get('schema', {}),
description=param.get('description'),
default=param.get('default')
))
return parsed
def _parse_request_body(self, request_body: Dict[str, Any]) -> RequestBody:
"""Parse request body definition."""
# Handle $ref
if '$ref' in request_body:
request_body = self.resolve_ref(request_body['$ref'])
return RequestBody(
content_types=request_body.get('content', {}),
required=request_body.get('required', False),
description=request_body.get('description')
)
[docs]
def resolve_ref(self, ref: str) -> Dict[str, Any]:
"""Resolve a $ref reference.
Args:
ref: Reference string like '#/components/schemas/CompanyDetails'
Returns:
The resolved object
"""
if not ref.startswith('#/'):
raise ValueError(f"Only local references are supported, got: {ref}")
parts = ref[2:].split('/')
current = self.spec
for part in parts:
if part in current:
current = current[part]
else:
raise ValueError(f"Could not resolve reference: {ref}")
return current
[docs]
def get_endpoints(self) -> List[EndpointDefinition]:
"""Get all endpoint definitions as a flat list."""
all_endpoints = []
for endpoints in self.parse().values():
all_endpoints.extend(endpoints)
return all_endpoints
[docs]
def get_schemas(self) -> Dict[str, Dict[str, Any]]:
"""Get all schema definitions."""
return self.schemas.copy()
[docs]
def get_endpoint_by_operation_id(self, operation_id: str) -> Optional[EndpointDefinition]:
"""Find an endpoint by its operation ID."""
for endpoint in self.get_endpoints():
if endpoint.operation_id == operation_id:
return endpoint
return None