From 7c076ead77a67ce8c24a9d7d85c1f709b783f9a4 Mon Sep 17 00:00:00 2001 From: Ryan Howard Date: Fri, 26 Jun 2026 02:15:31 -0400 Subject: [PATCH 1/2] add the pipette-current-speed-test to the pipette assembly diagnostic --- .../opentrons_api/helpers_ot3.py | 8 + .../pipette_assembly_qc_ot3.py | 302 +++++++++++++++++- 2 files changed, 307 insertions(+), 3 deletions(-) diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index d5ef6e29d21..03ae3ec0ced 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -1660,3 +1660,11 @@ def _is_int(s: str) -> bool: else: ctx.pause("扫描失败,请重试" if LOCALIZE else "Failed to scan, try again.") return default + + +def enable_stall_detection(api: SyncHardwareAPI, enable: bool) -> None: + """Toggle stall detection on or off.""" + flags = api.hardware_feature_flags + flags.stall_detection_enabled = enable + # for some reason mypy gets the property method but isn't the seeing the setter as a member + api.hardware_feature_flags = flags # type: ignore[attr-defined] diff --git a/hardware-testing/hardware_testing/production_qc_protocols/pipette_assembly_qc_ot3.py b/hardware-testing/hardware_testing/production_qc_protocols/pipette_assembly_qc_ot3.py index ece59bff94f..b2c8c48a09c 100644 --- a/hardware-testing/hardware_testing/production_qc_protocols/pipette_assembly_qc_ot3.py +++ b/hardware-testing/hardware_testing/production_qc_protocols/pipette_assembly_qc_ot3.py @@ -41,6 +41,11 @@ FailedTipStateCheck, InstrumentProbeType, ) +from opentrons.config.defaults_ot3 import ( + DEFAULT_RUN_CURRENT, + DEFAULT_MAX_SPEEDS, + DEFAULT_ACCELERATIONS, +) from opentrons_hardware.firmware_bindings import ArbitrationId, NodeId, MessageId from opentrons_hardware.firmware_bindings.messages import MessageDefinition from opentrons_hardware.firmware_bindings.messages.message_definitions import ( @@ -53,6 +58,8 @@ from opentrons.hardware_control.types import ( Axis, OT3Mount, + OT3AxisKind, + CriticalPoint, ) from opentrons_shared_data.errors.exceptions import StallOrCollisionDetectedError @@ -405,6 +412,7 @@ class TestSection(enum.Enum): TIP_SENSOR = "TIP-SENSOR" LIQUID_PROBE = "LIQUID-PROBE" ENCODER_CLEAN = "ENCODER-CLEAN" + CURRENT_SPEED = "CURRENT-SPEED" @dataclass @@ -428,6 +436,7 @@ class TestConfig: pipette_volume: int mount: OT3Mount trash_loc_counter: int + pipette_serial: str @dataclass @@ -476,6 +485,42 @@ class LabwareLocations: ) +DEFAULT_TRIALS = 5 +STALL_THRESHOLD_MM = 0.1 +TEST_ACCELERATION = 1500 # used during gravimetric tests + +DEFAULT_ACCELERATION = DEFAULT_ACCELERATIONS.low_throughput[OT3AxisKind.P] +DEFAULT_CURRENT = DEFAULT_RUN_CURRENT.low_throughput[OT3AxisKind.P] +DEFAULT_SPEED = DEFAULT_MAX_SPEEDS.low_throughput[OT3AxisKind.P] +MUST_PASS_CURRENT = round(DEFAULT_CURRENT * 0.75, 2) # the target spec (must pass here) +assert ( + MUST_PASS_CURRENT < DEFAULT_CURRENT +), "must-pass current must be less than default current" +TEST_SPEEDS = [ + DEFAULT_MAX_SPEEDS.low_throughput[OT3AxisKind.P] - 20, + DEFAULT_MAX_SPEEDS.low_throughput[OT3AxisKind.P], + DEFAULT_MAX_SPEEDS.low_throughput[OT3AxisKind.P] + 10, + DEFAULT_MAX_SPEEDS.low_throughput[OT3AxisKind.P] + 20, +] +PLUNGER_CURRENTS_SPEED = { + MUST_PASS_CURRENT - 0.45: TEST_SPEEDS, + MUST_PASS_CURRENT - 0.35: TEST_SPEEDS, + MUST_PASS_CURRENT - 0.25: TEST_SPEEDS, + MUST_PASS_CURRENT: TEST_SPEEDS, + DEFAULT_CURRENT: TEST_SPEEDS, +} + +MUST_PASS_CURRENT_TURE = 0.4 +PASS_PRINT_LIST: List[str] = [] + +MAX_SPEED = max(TEST_SPEEDS) +MAX_CURRENT = max(max(list(PLUNGER_CURRENTS_SPEED.keys())), 1.0) +assert MAX_CURRENT == DEFAULT_CURRENT, ( + f"do not test current ({MAX_CURRENT}) " + f"above the software's default current ({DEFAULT_CURRENT})" +) + + # --------------- Helpers ------------- @@ -1974,6 +2019,244 @@ def test_encoder( # ----------- END TEST ENCODER_CLEAN ----------- +# ----------- TEST Current Speed ----------- + + +def _get_cs_test_tag( + current: float, speed: float, trial: int, direction: str, pos: str +) -> str: + return f"current-{current}-speed-trial-{trial}-{speed}-{direction}-{pos}" + + +def _get_cs_section_tag(current: float) -> str: + return f"CURRENT-{current}-AMPS" + + +def _includes_result(current: float, speed: float, cfg: TestConfig) -> bool: + pass_currents = { + "P1KS": 0.5, + "P50S": 0.5, + "P1KP": 0.5, + "P1KM": 0.75, + "P50M": 0.75, + } + return current >= pass_currents[cfg.pipette_serial[:4]] + + +def build_current_speed_csv_sections(cfg: TestConfig) -> List[CSVSection]: + """Build CSV Lines.""" + sections = [ + CSVSection( + title=_get_cs_section_tag(current), + lines=[ + CSVLine( + _get_cs_test_tag(current, speed, trial, direction, pos), + [float, float, float, float, CSVResult] + if _includes_result(current, speed, cfg) + else [float, float, float, float], + ) + for speed in sorted(PLUNGER_CURRENTS_SPEED[current], reverse=False) + for trial in range(DEFAULT_TRIALS) + for direction in ["down", "up"] + for pos in ["start", "end"] + ], + ) + for current in sorted(list(PLUNGER_CURRENTS_SPEED.keys()), reverse=False) + ] + return sections + + +def _set_default_and_home_plunger(api: SyncHardwareAPI, mount: OT3Mount) -> None: + # restore default current/speed before homing + pipette_ax = Axis.of_main_tool_actuator(mount) + helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( + api, pipette_ax, run_current=1.0 + ) + helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( + api, + pipette_ax, + default_max_speed=DEFAULT_SPEED / 2, + acceleration=DEFAULT_ACCELERATION, + ) + api.home([pipette_ax]) + + +def _move_plunger( + api: SyncHardwareAPI, + mount: OT3Mount, + p: float, + s: float, + c: float, + a: float, +) -> None: + # set max currents/speeds, to make sure we're not accidentally limiting ourselves + pipette_ax = Axis.of_main_tool_actuator(mount) + helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( + api, pipette_ax, run_current=c + ) + helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( + api, + pipette_ax, + default_max_speed=MAX_SPEED, + acceleration=a, + ) + # move + helpers_ot3.move_plunger_absolute_ot3_sync( + api, mount, p, speed=s, motor_current=c, expect_stalls=True + ) + + +def _record_plunger_alignment( + api: SyncHardwareAPI, + cfg: TestConfig, + report: CSVReport, + trial: int, + current: float, + speed: float, + direction: str, + position: str, +) -> bool: + pipette_ax = Axis.of_main_tool_actuator(cfg.mount) + _current_pos = api.current_position_ot3(cfg.mount) + est = _current_pos[pipette_ax] + if not api.is_simulator: + _encoder_poses = api.encoder_current_position_ot3(cfg.mount) + enc = _encoder_poses[pipette_ax] + else: + enc = est + _stalled_mm = est - enc + _did_pass = abs(_stalled_mm) < STALL_THRESHOLD_MM + # NOTE: only tests that are required to PASS need to show a results in the file + data = [round(current, 2), round(speed, 2), round(est, 2), round(enc, 2)] + if _includes_result(current, speed, cfg): + data.append(CSVResult.from_bool(_did_pass)) # type: ignore[arg-type] + report( + _get_cs_section_tag(current), + _get_cs_test_tag(current, speed, trial, direction, position), + data, + ) + return _did_pass + + +def _test_direction( + api: SyncHardwareAPI, + cfg: TestConfig, + report: CSVReport, + trial: int, + current: float, + speed: float, + acceleration: float, + direction: str, +) -> bool: + plunger_poses = helpers_ot3.get_plunger_positions_ot3(api, cfg.mount) + top, _, bottom, _ = plunger_poses + # check that encoder/motor align + aligned = _record_plunger_alignment( + api, cfg, report, trial, current, speed, direction, "start" + ) + if not aligned: + return False + # move the plunger + _plunger_target = {"down": bottom, "up": top + 1.0}[direction] + try: + _move_plunger(api, cfg.mount, _plunger_target, speed, current, acceleration) + # check that encoder/motor still align + aligned = _record_plunger_alignment( + api, cfg, report, trial, current, speed, direction, "end" + ) + except StallOrCollisionDetectedError: + aligned = False + _set_default_and_home_plunger(api, cfg.mount) + return aligned + + +def _test_plunger( + ctx: ProtocolContext, + api: SyncHardwareAPI, + cfg: TestConfig, + report: CSVReport, + trials: int, + continue_after_stall: bool, +) -> None: + # start at HIGHEST (easiest) current + currents = sorted(list(PLUNGER_CURRENTS_SPEED.keys()), reverse=False) + for current in currents: + # start at LOWEST (easiest) speed + speeds = sorted(PLUNGER_CURRENTS_SPEED[current], reverse=False) + for speed in speeds: + for trial in range(trials): + ctx.comment( + f"CURRENT = {current}: " + f"SPEED = {speed}: " + f"TRIAL = {trial + 1}/{trials}" + ) + _set_default_and_home_plunger(api, cfg.mount) + for direction in ["down", "up"]: + test_pass = _test_direction( + api, + cfg, + report, + trial, + current, + speed, + TEST_ACCELERATION, + direction, + ) + if not test_pass: + if _includes_result(current, speed, cfg): + raise RuntimeError("Current Speed test failed.") + if continue_after_stall: + break + else: + return + + +def _reset_gantry(api: SyncHardwareAPI) -> None: + api.home( + [ + Axis.Z_L, + Axis.Z_R, + Axis.X, + Axis.Y, + ] + ) + home_pos = api.gantry_position(OT3Mount.RIGHT, CriticalPoint.MOUNT) + test_pos = helpers_ot3.get_slot_calibration_square_position_ot3(5) + test_pos = test_pos._replace(z=home_pos.z) + api.move_to(OT3Mount.RIGHT, test_pos, critical_point=CriticalPoint.MOUNT) + + +def test_current_speed( + api: SyncHardwareAPI, + report: CSVReport, + section: str, + ctx: ProtocolContext, + cfg: TestConfig, +) -> None: + """Run the pipette current speed test.""" + try: + helpers_ot3.enable_stall_detection(api, False) + _reset_gantry(api) + _test_plunger( + ctx=ctx, + api=api, + cfg=cfg, + report=report, + trials=DEFAULT_TRIALS, + continue_after_stall=False, + ) + report.save_to_disk() + report.print_results() + + except Exception as errrrr: + printsig = f"08-01-current-system-error:系统错误,日志:{errrrr}" + FINAL_TEST_FAIL_INFOR.append(printsig) + finally: + helpers_ot3.enable_stall_detection(api, True) + + +# ----------- END TEST Current Speed ----------- + def build_config_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]: """Build CSV Lines.""" @@ -2163,6 +2446,9 @@ def build_report(test_name: str, cfg: TestConfig) -> CSVReport: CSVSection( title=TestSection.ENCODER_CLEAN.value, lines=build_encoder_csv_lines() ), + ] + + build_current_speed_csv_sections(cfg) + + [ CSVSection( title="PRESSURE-DATA", lines=build_pressure_data_csv_lines(cfg.pipette_channels), @@ -2171,7 +2457,12 @@ def build_report(test_name: str, cfg: TestConfig) -> CSVReport: ) +# Re-order elements in this list to change the order in which they run TESTS = [ + ( + TestSection.CURRENT_SPEED, + test_current_speed, + ), ( TestSection.DIAGNOSTICS, test_diagnostics, @@ -2236,6 +2527,7 @@ def add_parameters(parameters: ParameterContext) -> None: default=False, description=f"When this is true the robot will not test {s.value.lower()}", ) + # TODO: Remove all these options that may or may not be necessary anymore parameters.add_str( display_name="fixture side", variable_name="fixture_side", @@ -2252,7 +2544,7 @@ def add_parameters(parameters: ParameterContext) -> None: minimum=1, maximum=10, default=2, - description="Number of trials to run.", + description="Number of trials to run the droplet test.", ) parameters.add_int( display_name="Aspirate Sample Count", @@ -2377,6 +2669,9 @@ def run(ctx: ProtocolContext) -> None: t_sections = { s: f for s, f in TESTS if not args[f"skip_{s.value.lower().replace('-', '_')}"] } + pipette = api.hardware_pipettes[OT3Mount.LEFT.to_mount()] + assert pipette, "Need to have at least a left pipette." + serial = helpers_ot3.get_pipette_serial_ot3(pipette) config = TestConfig( simulate=ctx.is_simulating(), tests=t_sections, @@ -2395,13 +2690,14 @@ def run(ctx: ProtocolContext) -> None: pipette_volume=1000, mount=OT3Mount.LEFT, trash_loc_counter=0, + pipette_serial=serial, ) test_name = "pipette-assembly-qc-ot3" attach_pos = helpers_ot3.get_slot_calibration_square_position_ot3(5) - current_pos = api.gantry_position(OT3Mount.RIGHT) - api.move_to(OT3Mount.RIGHT, attach_pos._replace(z=current_pos.z)) + current_pos = api.gantry_position(OT3Mount.LEFT) + api.move_to(OT3Mount.LEFT, attach_pos._replace(z=current_pos.z)) _load_labware_locations(config, ctx) pips = {OT3Mount.from_mount(m): p for m, p in api.hardware_pipettes.items() if p} assert pips, "no pipettes attached" From e95096581da9e70905c58da58a1aa6b3c6a84ada Mon Sep 17 00:00:00 2001 From: Ryan Howard Date: Fri, 26 Jun 2026 02:27:37 -0400 Subject: [PATCH 2/2] use this in the gripper current-speed section too --- .../gripper_assembly_qc_ot3.py | 112 +++++++++--------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/hardware-testing/hardware_testing/production_qc_protocols/gripper_assembly_qc_ot3.py b/hardware-testing/hardware_testing/production_qc_protocols/gripper_assembly_qc_ot3.py index b1e0c143117..a67b1e5330f 100644 --- a/hardware-testing/hardware_testing/production_qc_protocols/gripper_assembly_qc_ot3.py +++ b/hardware-testing/hardware_testing/production_qc_protocols/gripper_assembly_qc_ot3.py @@ -211,70 +211,74 @@ def _save_result(tag: str, target_z: float, include_pass_fail: bool) -> bool: report(section, tag, [target_z, z_enc]) return z_aligned - # LOOP THROUGH CURRENTS + SPEEDS - currents = list(CURRENTS_SPEEDS.keys()) - for current in sorted(currents, reverse=True): - speeds = CURRENTS_SPEEDS[current] - for speed in sorted(speeds, reverse=False): - include_pass_fail = current >= MIN_PASS_CURRENT - # HOME - api.home([z_ax]) - home_pos = api.gantry_position(OT3Mount.GRIPPER) - # LOWER CURRENT - helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( - api, z_ax, run_current=current - ) - helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( - api, z_ax, default_max_speed=speed - ) - api._set_active_current({z_ax: current}) - # MOVE DOWN - _save_result( - _get_mount_test_tag(current, speed, "down", "start"), - target_z=home_pos.z, - include_pass_fail=include_pass_fail, - ) - api.move_rel( - mount, - Point(z=-Z_AXIS_TRAVEL_DISTANCE), - speed=speed, - expect_stalls=True, - ) - down_end_passed = _save_result( - _get_mount_test_tag(current, speed, "down", "end"), - target_z=home_pos.z - Z_AXIS_TRAVEL_DISTANCE, - include_pass_fail=include_pass_fail, - ) - if down_end_passed: - # MOVE UP + try: + helpers_ot3.enable_stall_detection(api, False) + # LOOP THROUGH CURRENTS + SPEEDS + currents = list(CURRENTS_SPEEDS.keys()) + for current in sorted(currents, reverse=True): + speeds = CURRENTS_SPEEDS[current] + for speed in sorted(speeds, reverse=False): + include_pass_fail = current >= MIN_PASS_CURRENT + # HOME + api.home([z_ax]) + home_pos = api.gantry_position(OT3Mount.GRIPPER) + # LOWER CURRENT + helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( + api, z_ax, run_current=current + ) + helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( + api, z_ax, default_max_speed=speed + ) + api._set_active_current({z_ax: current}) + # MOVE DOWN _save_result( - _get_mount_test_tag(current, speed, "up", "start"), - target_z=home_pos.z - Z_AXIS_TRAVEL_DISTANCE, + _get_mount_test_tag(current, speed, "down", "start"), + target_z=home_pos.z, include_pass_fail=include_pass_fail, ) api.move_rel( mount, - Point(z=Z_AXIS_TRAVEL_DISTANCE), + Point(z=-Z_AXIS_TRAVEL_DISTANCE), speed=speed, expect_stalls=True, ) - up_end_passed = _save_result( - _get_mount_test_tag(current, speed, "up", "end"), - target_z=home_pos.z, + down_end_passed = _save_result( + _get_mount_test_tag(current, speed, "down", "end"), + target_z=home_pos.z - Z_AXIS_TRAVEL_DISTANCE, include_pass_fail=include_pass_fail, ) - else: - up_end_passed = False - # RESET CURRENTS AND HOME - helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( - api, z_ax, run_current=default_z_current - ) - helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( - api, z_ax, default_max_speed=default_z_speed - ) - api.home([z_ax]) - if not down_end_passed or not up_end_passed and not api.is_simulator: - break + if down_end_passed: + # MOVE UP + _save_result( + _get_mount_test_tag(current, speed, "up", "start"), + target_z=home_pos.z - Z_AXIS_TRAVEL_DISTANCE, + include_pass_fail=include_pass_fail, + ) + api.move_rel( + mount, + Point(z=Z_AXIS_TRAVEL_DISTANCE), + speed=speed, + expect_stalls=True, + ) + up_end_passed = _save_result( + _get_mount_test_tag(current, speed, "up", "end"), + target_z=home_pos.z, + include_pass_fail=include_pass_fail, + ) + else: + up_end_passed = False + # RESET CURRENTS AND HOME + helpers_ot3.set_gantry_load_per_axis_current_settings_ot3_sync( + api, z_ax, run_current=default_z_current + ) + helpers_ot3.set_gantry_load_per_axis_motion_settings_ot3_sync( + api, z_ax, default_max_speed=default_z_speed + ) + api.home([z_ax]) + if not down_end_passed or not up_end_passed and not api.is_simulator: + break + finally: + helpers_ot3.enable_stall_detection(api, False) # ----------------- TEST Probe ----------------