Source code for optiland.fileio.zemax.reader.parser

"""Zemax Data Parser

Parses a Zemax OpticStudio .zmx file into a ZemaxDataModel. The parser uses
a dispatch table of per-operand handler methods to process each line.

Kramer Harrison, 2024
"""

from __future__ import annotations

from typing import Any

import optiland.backend as be
from optiland.fileio.zemax.model import ZemaxDataModel
from optiland.materials import AbbeMaterial, BaseMaterial, Material
from optiland.physical_apertures import RadialAperture


[docs] class ZemaxDataParser: """Parses a Zemax .zmx file into a ZemaxDataModel. Args: filename: Path to the .zmx file to parse. Attributes: filename: The file path being parsed. data_model: The ZemaxDataModel being populated during parsing. """ def __init__(self, filename: str): self.filename = filename self.data_model = ZemaxDataModel() self._current_surf = -1 self._current_surf_data: dict[str, Any] = {} # Operand dispatch table — maps operand string to handler method self._operand_table = { "NAME": self._read_name, "FNUM": self._read_fno, "ENPD": self._read_epd, "OBNA": self._read_object_na, "FLOA": self._read_floating_stop, "FTYP": self._read_config_data, "XFLN": self._read_x_fields, "YFLN": self._read_y_fields, "WAVM": self._read_wavelength, "PWAV": self._read_primary_wave, "SURF": self._read_surface, "TYPE": self._read_surf_type, "PARM": self._read_surface_parameter, "CURV": self._read_radius, "DISZ": self._read_thickness, "CONI": self._read_conic, "GLAS": self._read_glass, "STOP": self._read_stop, "DIAM": self._read_diameter, "MODE": self._read_mode, "GCAT": self._read_glass_catalog, "FWGN": self._read_field_weights, "VDXN": self._read_vignette_decenter_x, "VDYN": self._read_vignette_decenter_y, "VCXN": self._read_vignette_compress_x, "VCYN": self._read_vignette_compress_y, "VANN": self._read_vignette_tangent_angle, "CLAP": self._read_circular_aperture, }
[docs] def parse(self) -> ZemaxDataModel: """Read the Zemax file and extract optical data into a ZemaxDataModel. Tries UTF-16 LE, UTF-8, and ISO-8859-1 encodings in that order. Returns: A populated ZemaxDataModel. Raises: ValueError: If the file cannot be read or contains no aperture data. """ encodings = ["utf-16", "utf-8", "iso-8859-1"] success = False for encoding in encodings: try: with open(self.filename, encoding=encoding) as fh: for line in fh: tokens = line.split() if not tokens: continue operand = tokens[0] if operand in self._operand_table: self._operand_table[operand](tokens) except (UnicodeError, UnicodeDecodeError): continue if self.data_model.aperture: success = True break if not success: raise ValueError("Failed to read Zemax file.") self._finalize_fields() self._finalize_surface() return self.data_model
# ------------------------------------------------------------------ # Per-operand handlers # ------------------------------------------------------------------ def _read_name(self, data: list[str]) -> None: self.data_model.name = " ".join(data[1:]) def _read_fno(self, data: list[str]) -> None: if int(data[2]) == 0: self.data_model.aperture["imageFNO"] = float(data[1]) elif int(data[2]) == 1: self.data_model.aperture["paraxialImageFNO"] = float(data[1]) def _read_epd(self, data: list[str]) -> None: self.data_model.aperture["EPD"] = float(data[1]) def _read_object_na(self, data: list[str]) -> None: if int(data[2]) == 0: self.data_model.aperture["objectNA"] = float(data[1]) elif int(data[2]) == 1: self.data_model.aperture["object_cone_angle"] = float(data[1]) def _read_floating_stop(self, data: list[str]) -> None: self.data_model.aperture["floating_stop"] = True def _read_config_data(self, data: list[str]) -> None: # Legacy ZEMAX files (e.g. VERS 6133) emit FTYP with only 1-2 tokens # where the current format has 8. Read defensively so a short FTYP # falls back to defaults instead of raising IndexError. def _safe_int(idx: int, default: int = 0) -> int: if idx >= len(data): return default tok = data[idx] if not tok: return default try: return int(tok) except ValueError: return default fields = self.data_model.fields fields["num_fields"] = _safe_int(3, 0) fields["type"] = { 0: "angle", 1: "object_height", 2: "paraxial_image_height", 3: "real_image_height", 4: "theodolite_angle", }.get(_safe_int(1, 0), "unsupported") self.data_model.wavelengths["num_wavelengths"] = _safe_int(4, 0) fields["object_space_telecentric"] = _safe_int(2, 0) == 1 fields["afocal_image_space"] = _safe_int(7, 0) == 1 # Legacy files may omit XFLN/YFLN entirely (implying a single # on-axis field). Seed defaults so downstream code does not hit # KeyError 'x' / 'y'. Any real XFLN/YFLN lines that follow will # overwrite these. fields.setdefault("x", [0.0]) fields.setdefault("y", [0.0]) if fields["num_fields"] <= 0: fields["num_fields"] = 1 def _read_x_fields(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["x"] = [float(v) for v in data[1 : n + 1]] def _read_y_fields(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["y"] = [float(v) for v in data[1 : n + 1]] def _read_wavelength(self, data: list[str]) -> None: val = float(data[2]) weight = float(data[3]) if len(data) > 3 else 1.0 if ( len(self.data_model.wavelengths["data"]) < self.data_model.wavelengths["num_wavelengths"] ): self.data_model.wavelengths["data"].append(val) self.data_model.wavelengths["weights"].append(weight) def _read_primary_wave(self, data: list[str]) -> None: self.data_model.wavelengths["primary_index"] = int(data[1]) - 1 def _read_surface(self, data: list[str]) -> None: if self._current_surf >= 0: self.data_model.surfaces[self._current_surf] = self._current_surf_data self._current_surf += 1 self._current_surf_data = { "type": "standard", "is_stop": False, "conic": 0.0, "material": "air", "aperture": None, } def _read_radius(self, data: list[str]) -> None: try: self._current_surf_data["radius"] = 1.0 / float(data[1]) except ZeroDivisionError: self._current_surf_data["radius"] = be.inf def _read_thickness(self, data: list[str]) -> None: if data[1] == "INFINITY": self._current_surf_data["thickness"] = be.inf else: self._current_surf_data["thickness"] = float(data[1]) def _read_conic(self, data: list[str]) -> None: self._current_surf_data["conic"] = float(data[1]) def _read_glass(self, data: list[str]) -> None: material_name = data[1] if material_name.upper() == "MIRROR": self._current_surf_data["material"] = "mirror" return self._current_surf_data["material"] = material_name try: self._current_surf_data["index"] = float(data[4].replace(",", ".")) self._current_surf_data["abbe"] = float(data[5].replace(",", ".")) except IndexError: self._current_surf_data["index"] = None self._current_surf_data["abbe"] = None # Try to resolve to a real Material from the glass catalog try: self._current_surf_data["material"] = Material(material_name) except ValueError: if self.data_model.glass_catalogs: for mfg in self.data_model.glass_catalogs: try: self._current_surf_data["material"] = Material( material_name, mfg.lower() ) break except ValueError: continue # Fall back to AbbeMaterial if catalog lookup failed if not isinstance(self._current_surf_data["material"], BaseMaterial): self._current_surf_data["material"] = AbbeMaterial( self._current_surf_data["index"], self._current_surf_data["abbe"], model="buchdahl", ) def _read_stop(self, data: list[str]) -> None: self._current_surf_data["is_stop"] = True def _read_diameter(self, data: list[str]) -> None: self._current_surf_data["diameter"] = float(data[1]) def _read_mode(self, data: list[str]) -> None: if data[1] != "SEQ": raise ValueError("Only sequential mode is supported.") def _read_glass_catalog(self, data: list[str]) -> None: self.data_model.glass_catalogs = data[1:] def _read_surf_type(self, data: list[str]) -> None: self._current_surf_data["type"] = { "STANDARD": "standard", "EVENASPH": "even_asphere", "ODDASPHE": "odd_asphere", "COORDBRK": "coordinate_break", "TOROIDAL": "toroidal", }.get(data[1], data[1].lower()) def _read_surface_parameter(self, data: list[str]) -> None: key = f"param_{int(data[1]) - 1}" self._current_surf_data[key] = float(data[2]) def _read_field_weights(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["weights"] = [float(v) for v in data[1 : n + 1]] def _read_vignette_decenter_x(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["vignette_decenter_x"] = [ float(v) for v in data[1 : n + 1] ] def _read_vignette_decenter_y(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["vignette_decenter_y"] = [ float(v) for v in data[1 : n + 1] ] def _read_vignette_compress_x(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["vignette_compress_x"] = [ float(v) for v in data[1 : n + 1] ] def _read_vignette_compress_y(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["vignette_compress_y"] = [ float(v) for v in data[1 : n + 1] ] def _read_vignette_tangent_angle(self, data: list[str]) -> None: n = self.data_model.fields["num_fields"] self.data_model.fields["vignette_tangent_angle"] = [ float(v) for v in data[1 : n + 1] ] def _read_circular_aperture(self, data: list[str]) -> None: self._current_surf_data["aperture"] = RadialAperture( r_min=float(data[1]), r_max=float(data[2]) ) # ------------------------------------------------------------------ # Finalizers # ------------------------------------------------------------------ def _finalize_fields(self) -> None: """Deduplicate and sort fields by y-coordinate.""" fields = self.data_model.fields if "x" not in fields or "y" not in fields: return keys = ["x", "y"] for extra in [ "weights", "vignette_decenter_x", "vignette_decenter_y", "vignette_compress_x", "vignette_compress_y", "vignette_tangent_angle", ]: if extra in fields: keys.append(extra) zipped = list(zip(*(fields[k] for k in keys), strict=False)) seen = set() unique = [] for item in zipped: xy = item[:2] if xy not in seen: seen.add(xy) unique.append(item) sorted_items = sorted(unique, key=lambda it: it[1]) if not sorted_items: return unzipped = list(zip(*sorted_items, strict=False)) for i, k in enumerate(keys): fields[k] = list(unzipped[i]) def _finalize_surface(self) -> None: """Flush the last in-progress surface into the model.""" if self._current_surf >= 0: self.data_model.surfaces[self._current_surf] = self._current_surf_data