import csv import math from pathlib import Path # ----------------------- # AUTO CSV DISCOVERY # ----------------------- SCRIPT_DIR = Path(__file__).resolve().parent csv_files = list(SCRIPT_DIR.glob("*.csv")) if len(csv_files) == 0: raise RuntimeError("No CSV file found in script directory.") if len(csv_files) > 1: raise RuntimeError( "Multiple CSV files found. Please keep only ONE CSV in the folder:\n" + "\n".join(f.name for f in csv_files) ) CSV_PATH = csv_files[0] OUT_TSCN = CSV_PATH.with_suffix(".tscn") print(f"Using CSV: {CSV_PATH.name}") print(f"Generating scene: {OUT_TSCN.name}") # ----------------------- # CONFIG # ----------------------- SCALE = 0.0254 FIXED_Y = 2.5 STRAIGHT_BELT_ID = "3_38ygf" CURVED_BELT_ID = "1_ef28r" MIN_CURVE_DEG = 1.0 MIN_REAL_ROT_DEG = 5.0 # Geometry (must match asset) CURVE_INNER_RADIUS = 0.5 BELT_WIDTH_DEFAULT = 1.524 # default size.z DEGEN_EPS = 0.001 # meters (true zero-length) # FIX: curved asset forward-axis / pivot assumptions # - If your curved assembly faces "backwards" compared to your straight conveyors, # adding 180deg fixes that. CURVE_ROT_OFFSET = math.pi # 180 degrees # Gap stitching tolerance EPS = 0.05 # meters # Straight-run “de-jitter” (alignment) pass STRAIGHTEN_RUNS = True STRAIGHTEN_MAX_DEV_DEG = 2.0 # if a "straight run" deviates more than this, skip straightening that run # Optional width columns (if you add them to CSV later) # - *_m assumed meters # - *_in assumed inches -> meters via SCALE WIDTH_COLS = [ ("belt_width_m", 1.0), ("width_m", 1.0), ("belt_width_in", SCALE), ("width_in", SCALE), ("belt_width", SCALE), # treat as inches by default ("width", SCALE), ] # ----------------------- # HELPERS # ----------------------- def rot_deg(rot_y): return abs(rot_y * 180.0 / math.pi) def transform_from_points(x1, y1, x2, y2): dx = x2 - x1 dy = y2 - y1 length = math.hypot(dx, dy) * SCALE mid_x = (x1 + x2) / 2 * SCALE mid_z = -(y1 + y2) / 2 * SCALE rot_y = math.atan2(-dy, dx) # AutoCAD +Y becomes Godot -Z return { "length": length, "pos": (mid_x, FIXED_Y, mid_z), "rot_y": rot_y, } def transform3d(rot_y, x, y, z): c = math.cos(rot_y) s = math.sin(rot_y) return ( f"Transform3D({c}, 0, {-s}, " f"0, 1, 0, " f"{s}, 0, {c}, " f"{x}, {y}, {z})" ) def parse_key(key): p = key.split("_") return p[0], int(p[1]) def unit_fwd(rot_y): return math.cos(rot_y), math.sin(rot_y) def unit_right(rot_y): return -math.sin(rot_y), math.cos(rot_y) def start_point(conv): dx, dz = unit_fwd(conv["rot_y"]) half = conv["length"] / 2 return (conv["pos"][0] - dx * half, conv["pos"][2] - dz * half) def end_point(conv): dx, dz = unit_fwd(conv["rot_y"]) half = conv["length"] / 2 return (conv["pos"][0] + dx * half, conv["pos"][2] + dz * half) def add_straight_node(lines, name, rot_y, cx, cz, length, width): t = transform3d(rot_y, cx, FIXED_Y, cz) lines.append(f'[node name="{name}" parent="." instance=ExtResource("{STRAIGHT_BELT_ID}")]') lines.append(f"transform = {t}") lines.append("right_side_guards_enabled = false") lines.append("left_side_guards_enabled = false") lines.append("head_end_leg_enabled = false") lines.append("tail_end_leg_enabled = false") lines.append("enable_comms = true") lines.append(f'speed_tag_name = "{name}_OIP"') lines.append(f"size = Vector3({length:.6f}, 0.5, {width:.6f})") lines.append("") def add_curved_node(lines, name, rot_y, px, pz, angle_deg): t = transform3d(rot_y, px, FIXED_Y, pz) lines.append(f'[node name="{name}" parent="." instance=ExtResource("{CURVED_BELT_ID}")]') lines.append(f"transform = {t}") lines.append(f"inner_radius = {CURVE_INNER_RADIUS}") lines.append(f"conveyor_angle = {angle_deg:.3f}") lines.append("enable_comms = true") lines.append(f'speed_tag_name = "{name}_OIP"') lines.append("") def angle_between_fwd(a_fwd, b_fwd): ax, az = a_fwd bx, bz = b_fwd cross = ax * bz - az * bx dot = ax * bx + az * bz return abs(math.atan2(cross, dot)) def rebuild_from_start_end(conv, sx, sz, ex, ez): length = math.hypot(ex - sx, ez - sz) cx = (sx + ex) / 2 cz = (sz + ez) / 2 rot_y = math.atan2(ez - sz, ex - sx) conv["length"] = length conv["pos"] = (cx, FIXED_Y, cz) conv["rot_y"] = rot_y conv["start"] = (sx, sz) conv["end"] = (ex, ez) conv["fwd"] = unit_fwd(rot_y) def unit_from_to(ax, az, bx, bz): dx = bx - ax dz = bz - az d = math.hypot(dx, dz) if d <= 0.0: return (0.0, 0.0), 0.0 return (dx / d, dz / d), d def get_width_from_row(row): for col, mul in WIDTH_COLS: if col in row: v = str(row[col]).strip() if v != "": try: return float(v) * mul except ValueError: pass return BELT_WIDTH_DEFAULT def best_fit_dir(points): # PCA in 2D: angle = 0.5 * atan2(2*Sxz, Sxx - Szz) if len(points) < 2: return (1.0, 0.0) mx = sum(x for x, _ in points) / len(points) mz = sum(z for _, z in points) / len(points) sxx = szz = sxz = 0.0 for x, z in points: dx = x - mx dz = z - mz sxx += dx * dx szz += dz * dz sxz += dx * dz if sxx == 0.0 and szz == 0.0 and sxz == 0.0: return (1.0, 0.0) ang = 0.5 * math.atan2(2.0 * sxz, (sxx - szz)) return (math.cos(ang), math.sin(ang)) def project_to_line(origin, direction, p): ox, oz = origin dx, dz = direction px, pz = p t = (px - ox) * dx + (pz - oz) * dz return (ox + dx * t, oz + dz * t) # ----------------------- # READ CSV # ----------------------- straight = {} # geometry + chain-continued + promoted inline gaps vfd_only = [] # included=0 with open(CSV_PATH, newline="") as f: reader = csv.DictReader(f) for row in reader: # ----------------------- # NEW: only process conveyor rows # ----------------------- if row.get("record_type", "CONVEYOR") != "CONVEYOR": continue key = row["conveyor_key"].strip() included = str(row["included"]).strip() prefix, sec = parse_key(key) if included == "1": if not all(str(row.get(c, "")).strip() for c in ("start_x", "start_y", "end_x", "end_y")): continue conv = transform_from_points( float(row["start_x"]), float(row["start_y"]), float(row["end_x"]), float(row["end_y"]) ) conv["name"] = key conv["prefix"] = prefix conv["sec"] = sec conv["width"] = get_width_from_row(row) straight[key] = conv else: vfd_only.append({"name": key, "prefix": prefix, "sec": sec}) # Precompute geometry data for c in straight.values(): c["start"] = start_point(c) c["end"] = end_point(c) c["fwd"] = unit_fwd(c["rot_y"]) # ----------------------- # 1.5) Extend geometry-to-geometry gaps (degenerate-aware) # ----------------------- by_prefix = {} for c in straight.values(): sx, sz = c["start"] ex, ez = c["end"] c["degenerate"] = (math.hypot(ex - sx, ez - sz) <= DEGEN_EPS) # For degenerate belts, start=end is the anchor location c["anchor"] = c["start"] by_prefix.setdefault(c["prefix"], []).append(c) for prefix, items in by_prefix.items(): items.sort(key=lambda x: x["sec"]) i = 0 while i < len(items) - 1: a = items[i] b = items[i + 1] # ------------------------------- # CASE 1: b is degenerate # First try: rebuild b from a.end -> b.anchor # ------------------------------- if b["degenerate"]: ax, az = a["end"] bx, bz = b["anchor"] to_vec, dist = unit_from_to(ax, az, bx, bz) if dist > EPS: if angle_between_fwd(a["fwd"], to_vec) <= math.radians(MIN_CURVE_DEG): rebuild_from_start_end(b, ax, az, bx, bz) b["degenerate"] = (b["length"] <= DEGEN_EPS) i += 1 continue # Fallback: a.end -> next non-degenerate start j = i + 2 while j < len(items) and items[j]["degenerate"]: j += 1 if j < len(items): c = items[j] if angle_between_fwd(a["fwd"], c["fwd"]) <= math.radians(MIN_CURVE_DEG): sx, sz = a["end"] ex, ez = c["start"] span = math.hypot(ex - sx, ez - sz) if span > EPS: rebuild_from_start_end(b, sx, sz, ex, ez) b["degenerate"] = (b["length"] <= DEGEN_EPS) i += 1 continue # ------------------------------- # CASE 2: normal gap extension # extend a → b.start # ------------------------------- if angle_between_fwd(a["fwd"], b["fwd"]) > math.radians(MIN_CURVE_DEG): i += 1 continue # real turn dx = b["start"][0] - a["end"][0] dz = b["start"][1] - a["end"][1] gap = math.hypot(dx, dz) if gap <= EPS: i += 1 continue # already touching sx, sz = a["start"] ex, ez = b["start"] rebuild_from_start_end(a, sx, sz, ex, ez) a["degenerate"] = (a["length"] <= DEGEN_EPS) i += 1 # Normalize flags for c in straight.values(): c["degenerate"] = (c["length"] <= DEGEN_EPS) # ----------------------- # 1) Chain-continue VFD-only straights (NO rendering yet) # ----------------------- placed_chain = set() made_progress = True while made_progress: made_progress = False for item in vfd_only: name = item["name"] if name in straight or name in placed_chain: continue prefix = item["prefix"] sec = item["sec"] prev_key = f"{prefix}_{sec-1}" next_key = f"{prefix}_{sec+1}" if prev_key in straight and next_key not in straight: ref = straight[prev_key] dx, dz = ref["fwd"] length = ref["length"] sx, sz = ref["end"] ex, ez = sx + dx * length, sz + dz * length cx, cz = sx + dx * length / 2, sz + dz * length / 2 straight[name] = { "name": name, "prefix": prefix, "sec": sec, "length": length, "pos": (cx, FIXED_Y, cz), "rot_y": ref["rot_y"], "start": (sx, sz), "end": (ex, ez), "fwd": (dx, dz), "degenerate": (length <= DEGEN_EPS), "anchor": (sx, sz), "width": ref.get("width", BELT_WIDTH_DEFAULT), } placed_chain.add(name) made_progress = True continue if next_key in straight and prev_key not in straight: ref = straight[next_key] dx, dz = ref["fwd"] length = ref["length"] ex, ez = ref["start"] sx, sz = ex - dx * length, ez - dz * length cx, cz = sx + dx * length / 2, sz + dz * length / 2 straight[name] = { "name": name, "prefix": prefix, "sec": sec, "length": length, "pos": (cx, FIXED_Y, cz), "rot_y": ref["rot_y"], "start": (sx, sz), "end": (ex, ez), "fwd": (dx, dz), "degenerate": (length <= DEGEN_EPS), "anchor": (sx, sz), "width": ref.get("width", BELT_WIDTH_DEFAULT), } placed_chain.add(name) made_progress = True continue # ----------------------- # 1.75) Straight-run de-jitter (fix tiny rotations from noisy points) # ----------------------- if STRAIGHTEN_RUNS: by_prefix2 = {} for c in straight.values(): by_prefix2.setdefault(c["prefix"], []).append(c) for prefix, items in by_prefix2.items(): items.sort(key=lambda x: x["sec"]) run_start = 0 while run_start < len(items): run_end = run_start while run_end + 1 < len(items): a = items[run_end] b = items[run_end + 1] if angle_between_fwd(a["fwd"], b["fwd"]) > math.radians(MIN_CURVE_DEG): break run_end += 1 # Run is items[run_start : run_end+1] if run_end - run_start >= 1: run = items[run_start:run_end + 1] # Nodes: start of first, then each end nodes = [run[0]["start"]] + [seg["end"] for seg in run] # Fit direction d = best_fit_dir(nodes) # Orient direction from first -> last fx, fz = nodes[0] lx, lz = nodes[-1] if (lx - fx) * d[0] + (lz - fz) * d[1] < 0: d = (-d[0], -d[1]) # If any segment in this run is way off, skip straightening this run max_dev = 0.0 for seg in run: if seg["length"] > DEGEN_EPS: max_dev = max(max_dev, angle_between_fwd(seg["fwd"], d)) if max_dev <= math.radians(STRAIGHTEN_MAX_DEV_DEG): origin = nodes[0] new_nodes = [project_to_line(origin, d, p) for p in nodes] # Rebuild each segment from projected nodes for seg, p0, p1 in zip(run, new_nodes[:-1], new_nodes[1:]): rebuild_from_start_end(seg, p0[0], p0[1], p1[0], p1[1]) run_start = run_end + 1 # Final normalize flags after straightening for c in straight.values(): c["degenerate"] = (c["length"] <= DEGEN_EPS) # ----------------------- # WRITE TSCN # ----------------------- lines = [] lines.append('[gd_scene load_steps=3 format=3]') lines.append('') lines.append('[ext_resource type="PackedScene" path="res://parts/assemblies/BeltConveyorAssembly.tscn" id="3_38ygf"]') lines.append('[ext_resource type="PackedScene" path="res://parts/assemblies/CurvedBeltConveyorAssembly.tscn" id="1_ef28r"]') lines.append('') lines.append('[node name="GeneratedConveyors" type="Node3D"]') lines.append('') written_straights = set() for name in sorted(straight.keys()): c = straight[name] # Only skip true zero-length belts if c["length"] <= DEGEN_EPS: continue cx, _, cz = c["pos"] width = c.get("width", BELT_WIDTH_DEFAULT) add_straight_node(lines, name, c["rot_y"], cx, cz, c["length"], width) written_straights.add(name) # Spur logic (MODIFIED: resilient neighbor lookup + FIXED curved placement + FIXED curved rotation) placed_spurs = set() def find_prev_next(prefix, sec): # Find nearest existing straight neighbors in this prefix if sec±1 isn't present prev_key = f"{prefix}_{sec-1}" next_key = f"{prefix}_{sec+1}" if prev_key not in straight: k = sec - 1 while k > 0: cand = f"{prefix}_{k}" if cand in straight: prev_key = cand break k -= 1 if next_key not in straight: k = sec + 1 # hard cap to avoid infinite search on bad data while k < sec + 200: cand = f"{prefix}_{k}" if cand in straight: next_key = cand break k += 1 if prev_key not in straight or next_key not in straight: return None, None return prev_key, next_key for spur in vfd_only: name = spur["name"] if name in straight: continue prefix = spur["prefix"] sec = spur["sec"] prev_key, next_key = find_prev_next(prefix, sec) if not prev_key or not next_key: continue prev = straight[prev_key] nxt = straight[next_key] pfx, pfz = prev["fwd"] nfx, nfz = nxt["fwd"] cross = pfx * nfz - pfz * nfx dot = pfx * nfx + pfz * nfz delta = math.atan2(cross, dot) angle_deg = abs(delta) * 180.0 / math.pi if angle_deg < MIN_CURVE_DEG: continue prev_rot_deg = rot_deg(prev["rot_y"]) next_rot_deg = rot_deg(nxt["rot_y"]) # Inline gap (render straight) if prev_rot_deg < MIN_REAL_ROT_DEG and next_rot_deg < MIN_REAL_ROT_DEG: sx, sz = prev["end"] ex, ez = nxt["start"] dx = ex - sx dz = ez - sz length = math.hypot(dx, dz) if length < 0.01: continue cx = (sx + ex) / 2 cz = (sz + ez) / 2 rot_y = math.atan2(dz, dx) width = prev.get("width", BELT_WIDTH_DEFAULT) straight[name] = { "name": name, "prefix": prefix, "sec": sec, "length": length, "pos": (cx, FIXED_Y, cz), "rot_y": rot_y, "start": (sx, sz), "end": (ex, ez), "fwd": unit_fwd(rot_y), "degenerate": (length <= DEGEN_EPS), "anchor": (sx, sz), "width": width, } if name not in written_straights and length > DEGEN_EPS: add_straight_node(lines, name, rot_y, cx, cz, length, width) written_straights.add(name) continue # ----------------------- # FIXED: Curved spur placement + 180° yaw offset # # Old logic assumed a pivot/tangent relationship that effectively used (width/2 - inner_radius), # which is wrong for typical curved assets. # # New logic assumes the curved asset pivot is the CIRCLE CENTER of the BELT CENTERLINE: # r_center = inner_radius + width/2 # place = prev.end + inward * r_center # and we also apply CURVE_ROT_OFFSET (pi) if the asset faces backwards. # ----------------------- turn_sign = 1.0 if delta > 0 else -1.0 rx, rz = unit_right(prev["rot_y"]) end_x, end_z = prev["end"] prev_width = prev.get("width", BELT_WIDTH_DEFAULT) # inward points from the tangent point toward the curve center inward_x = -rx * turn_sign inward_z = -rz * turn_sign # centerline radius from inner edge r_center = CURVE_INNER_RADIUS + (prev_width / 2.0) place_x = end_x + inward_x * r_center place_z = end_z + inward_z * r_center add_curved_node( lines, name, prev["rot_y"] + CURVE_ROT_OFFSET, place_x, place_z, angle_deg ) placed_spurs.add(name) # ----------------------- # WRITE FILE # ----------------------- OUT_TSCN.write_text("\n".join(lines), encoding="utf-8") leftovers = [v["name"] for v in vfd_only if v["name"] not in placed_spurs and v["name"] not in straight] geom_written = len([k for k, c in straight.items() if c["length"] > DEGEN_EPS]) geom_degen = [k for k, c in straight.items() if c["length"] <= DEGEN_EPS] print( f"Geometry written: {geom_written - len(placed_chain)}, " f"chain-continued straights: {len(placed_chain)}, " f"spurs placed: {len(placed_spurs)}, " f"leftovers: {len(leftovers)}" ) if leftovers: print("Leftovers:", ", ".join(leftovers)) if geom_degen: print("Still degenerate geometry (NOT written):", ", ".join(sorted(geom_degen)))