Friday, 3 July 2026

Automate Exadata DBA task using Ansible and Python

Question : How to automate patching activity



Automating Exadata patching involves orchestrating rolling updates across the compute nodes (e.g., updating node 1 while others host databases, then rotating). Use Ansible to orchestrate the workflow (node rotation, service management) and Python (via subprocess or Oracle's patchmgr/dbaascli) to execute pre-checks, apply patches, and verify post-patch states. 
Pre-requisites & Best Practices
  • Rolling Strategy: A Half Rack Exadata typically has 4 compute nodes. You must patch one node at a time while migrating databases and Grid Infrastructure (GI) services to surviving nodes. 
  • Software: Download the latest Exadata Database Server (DB Node) update and the Quarterly Full Stack Patch (QFSDP) from My Oracle Support
  • Cell & Storage Check: Verify cellcli confirms no degraded disks or flash cache.
  • Topology: Exadata X8M Half Node means 4 Database Servers and 7/8 Storage Cells (depending on your specific quarter rack expansion).
  • Backup: Take full RMAN backups of all databases and ensure Grid Infrastructure Management Repository (GIMR) is backed up.
  • Connectivity & SSH: Ensure passwordless SSH is configured from the Ansible control node to the root and oracle users on all Exadata compute nodes. 
  • Exadata Storage Server (Cell) & IB/RoCE Switches: Patching typically involves updating Compute Nodes, Storage Servers, and InfiniBand/RoCE switches.
  • The Driving System: Use a dedicated remote server (a "driving system") or the first Compute Node (e.g., dbadm01) to initiate the updates. The target nodes must be fully passwordless-SSH accessible via the oracle or grid user. 
  • Maintenance Window: Allocate ≈ 4 to 6 hours for a Grid Infrastructure (GI) and Database Release Update (RU) cycle. 
  • Pre-Checks: Ensure patchmgr runs with -toothpick (to verify without applying) before committing to any patch.
  • Run the patchmgr -precheck command to validate firmware versions, RPMs, and network configuration before applying changes.
  • Clusterware Integrity: Run crsctl check crs and crsctl check cluster -all to ensure the cluster is healthy. 
  • Space Utilization: Verify free space on /u01 and /var/tmp partitions where patch staging occurs.
  • Test Cases to Validate:
    • Pre-check Phase: Run patchmgr with -check on all nodes to ensure space and RPM dependencies without making changes.
    • Rolling Mode: Verify that CRS on Node 1 is gracefully stopped and resources relocate to Node 2 before applying patches.
    • Post-patch Validation: Run crsctl check crs and verify that all Databases are running and accessible on the patched nod
  • 2. Basic Knowledge: Native Utilities to Call
    • GI/DB Homes: opatchauto applies GI and DB patches.
    • OS / Firmware: Oracle's patchmgr utility (located in /opt/oracle.SupportTools/patchmgr) orchestrates the patching process for compute node operating systems and Exadata cells. 
Step 1: Python Script for Patch Execution
Save this Python script as exadata_patch.py. It provides functions to trigger pre-checks and run the patch applications via Oracle’s native utilities.
python
import subprocess
import logging
import sys

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def run_command(command):
    """Utility function to execute shell commands and stream output."""
    logging.info(f"Executing: {command}")
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    
    while True:
        output = process.stdout.readline()
        if output == '' and process.poll() is not None:
            break
        if output:
            logging.info(output.strip())
            
    rc = process.poll()
    if rc != 0:
        error = process.stderr.read()
        logging.error(f"Command failed with RC {rc}. Error: {error.strip()}")
        sys.exit(rc)

def run_patch_precheck(node):
    """Executes Exadata patchmgr prechecks."""
    logging.info(f"Running Patch Pre-checks on {node}")
    cmd = f"ssh root@{node} '/opt/oracle.SupportTools/patchmgr/patchmgr --hosts {node} --rolls_check'"
    run_command(cmd)

def apply_patch(node, patch_dir):
    """Executes the actual rolling patch application."""
    logging.info(f"Applying patch on {node}...")
    cmd = f"ssh root@{node} '/opt/oracle.SupportTools/patchmgr/patchmgr --hosts {node} --upgrade --patch_base {patch_dir}'"
    run_command(cmd)

if __name__ == "__main__":
    target_node = "exadata-node1.localdomain"
    patch_location = "/u01/stages/exadata/ru_patch"
    
    # 1. Run Precheck
    run_patch_precheck(target_node)
    
    # 2. Apply Patch
    # apply_patch(target_node, patch_location)
Step 2: Ansible Playbook
This playbook defines the rolling automation workflow. Save as rolling_patch.yml.
yaml
---
- name: Exadata Rolling Patch Automation
  hosts: exadata_nodes
  become: yes
  gather_facts: yes

  tasks:
    - name: 1. Stop Oracle Grid Infrastructure & Databases on Target Node
      ansible.builtin.shell: |
        su - oracle -c "srvctl stop home -s {{ oracle_home }} -n {{ inventory_hostname }}"
      delegate_to: localhost
      vars:
        oracle_home: /u01/app/19.0.0.0/grid

    - name: 2. Execute Python Patch Pre-Check Script
      ansible.builtin.command: python3 /opt/scripts/exadata_patch.py --node {{ inventory_hostname }} --action precheck
      delegate_to: localhost
      register: precheck_out

    - name: 3. Apply Patch via Python
      ansible.builtin.command: python3 /opt/scripts/exadata_patch.py --node {{ inventory_hostname }} --action apply
      when: precheck_out.rc == 0

    - name: 4. Post-Patch Verification (Reboot & Check)
      ansible.builtin.shell: |
        ssh root@{{ inventory_hostname }} 'reboot'
        sleep 300
      delegate_to: localhost

    - name: 5. Verify Cluster State (Wait for node to join cluster)
      ansible.builtin.shell: |
        ssh root@{{ inventory_hostname }} '/u01/app/19.0.0.0/grid/bin/crsctl check has'
      delegate_to: localhost
Test Cases & Validation
Integrate the following validation tests before and after running your automation:
  1. Test Case 1: Clusterware and Database Pre-checks
    • Objective: Ensure the targeted Exadata node is ready to be taken offline.
    • Action: Run crsctl check cluster -all to ensure all other nodes are stable.
    • Validation: No databases or instances belonging to the target node are reported as "OFFLINE" on surviving nodes unexpectedly. 
  2. Test Case 2: Verification of Free Space
    • Objective: Verify adequate space exists in /u01 or /var/log before extracting large patch payloads.
    • Action: Run df -h /u01 via Ansible.
    • Validation: Minimum 15 GB of free space is available. 
  3. Test Case 3: Post-Patch Status and OPatch
    • Objective: Verify the new release update has registered correctly.
    • Action: su - oracle -c "cd $ORACLE_HOME/OPatch && ./opatch lsinventory"
    • Validation: The new bundle patch ID (e.g., \(19.24.0.0.0\)) appears in the installed patch list. [
Useful Oracle References
Refer to official deployment methodologies depending on whether you are patching using patchmgr (for Grid/OS) or dbaascli (if managing Cloud-at-Customer instances)



Question : How to automate on-premises Half Rack RAC Exadata patching uses a hybrid approach using ansible and python

Automating on-premises Half Rack RAC Exadata patching uses a hybrid approach: Python triggers and monitors Exadata's native patchmgr utility, while Ansible orchestrates the nodes, coordinates cluster states, and manages validation. Rolling patches are applied sequentially across your 4 compute nodes. 
1. Python Driving Script (exadata_patch_executor.py)
This script uses the subprocess module to execute patchmgr commands remotely or locally via dcli on the half node. 
python
import subprocess
import json
import sys

# Configuration for Half Rack (4 nodes)
NODES = ["exadb01", "exadb02", "exadb03", "exadb04"]
PATCH_LOC = "/u01/patch/19.0.0.0/RU"

def run_command(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    return {
        "return_code": process.returncode,
        "stdout": stdout.decode('utf-8'),
        "stderr": stderr.decode('utf-8')
    }

def execute_patchmgr(node, action):
    # DCLI/patchmgr integration - assuming patchmgr is configured
    cmd = f"dcli -n {node} 'cd /opt/oracle.SupportTools/patchmgr && ./patchmgr {action} -crshome /u01/app/19.0.0/grid -dbhome /u01/app/oracle/product/19.0.0/dbhome_1'"
    return run_command(cmd)

def main():
    # Pre-patch checks
    for node in NODES:
        result = execute_patchmgr(node, "-check")
        if result["return_code"] != 0:
            print(f"Pre-check failed on {node}: {result['stderr']}")
            sys.exit(1)

    # Rolling patch application
    for node in NODES:
        print(f"Applying patch to {node}...")
        result = execute_patchmgr(node, "-apply")
        if result["return_code"] != 0:
            print(f"Patching failed on {node}: {result['stderr']}")
            sys.exit(1)

    print("Exadata Rolling Patch Applied Successfully.")

if __name__ == "__main__":
    main()
2. Ansible Playbook (patch_orchestration.yml)
This playbook orchestrates the pre-requisite node evictions and the execution of the Python automation framework.
yaml
---
- name: Exadata Rolling Patch Orchestration
  hosts: exadata_db_nodes
  gather_facts: yes
  become: yes

  tasks:
    - name: Pre-check DB Services & CRSD Status
      ansible.builtin.shell: "crsctl check cluster"
      register: crs_status

    - name: Run Python Patching Automation
      ansible.builtin.script:
        cmd: /u01/scripts/exadata_patch_executor.py
      register: patch_execution
      delegate_to: localhost

    - name: Verify Clusterware Post-Patching
      ansible.builtin.shell: "crsctl check crs"
      register: post_crs_check
      retries: 3
      delay: 60
      until: post_crs_check.rc == 0
3. Verification & Test Cases
Execute these validation commands through the Ansible ad-hoc module or standalone to guarantee patch completion:
Test Case 1: Active Node Verification
  • Command: /u01/app/19.0.0/grid/bin/crsctl stat res -t -init
  • Expected Result: Cluster synchronization must show status ONLINE on the targeted Exadata compute node.
Test Case 2: GI (Grid Infrastructure) Patch Level Validation
  • Command: /u01/app/19.0.0/grid/OPatch/opatch lspatches
  • Expected Result: The newly applied Release Update (RU) must successfully populate in the inventory list.
Test Case 3: Opatch Database Verification
  • Command: /u01/app/oracle/product/19.0.0/dbhome_1/OPatch/opatch lspatches
  • Expected Result: All database homes must log as updated without reporting conflicts or rollback failures.

  • Cluster Verification: Run /u01/app/19.0.0/grid/bin/crsctl check cluster -all to verify all clusterware processes are online.
  • Database Status: Run /u01/app/19.0.0/grid/bin/srvctl status database -d <db_name> to verify that all databases are running on their preferred nodes.
  • Patch Inventory: Execute /u01/app/19.0.0/grid/OPatch/opatch lspatches to confirm that the Oracle homes successfully show the applied Release Updates (RU).
  • Log Analysis: Review the exact diagnostic outputs generated during execution under /var/log/cellos/dbnodeupdate.log and the trace files in the patchmgr working directories. 

  • Question : How to automate Storage Metrics & Partition Maintenance

    1. The Python Script (Storage Metrics & Partition Maintenance)
    Save this as dba_maintenance.py. It establishes a connection to the database, checks storage usage, and manages partitions. 
    python
    import sys
    import oracledb
    
    def connect_db():
        # Replace with your actual Exadata connection details
        username = 'system'
        password = 'your_password'
        dsn = '://example.com'
        
        try:
            connection = oracledb.connect(user=username, password=password, dsn=dsn)
            return connection
        except Exception as e:
            print(f"Database connection failed: {e}")
            sys.exit(1)
    
    def drop_old_partitions(connection):
        cursor = connection.cursor()
        # Example: Drop partitions older than 30 days for a sample table
        sql = """
        BEGIN
            EXECUTE IMMEDIATE 'ALTER TABLE sales DROP PARTITION sales_q1_2023';
            COMMIT;
        EXCEPTION
            WHEN OTHERS THEN
                IF SQLCODE != -21402 THEN -- ORA-21402: partition does not exist
                    RAISE;
                END IF;
        END;
        """
        try:
            cursor.execute(sql)
            print("Partition maintenance completed successfully.")
        except Exception as e:
            print(f"Partition drop failed: {e}")
        finally:
            cursor.close()
    
    def get_storage_metrics(connection):
        cursor = connection.cursor()
        # Query Exadata ASM allocation units to find disk group usage
        query = "SELECT name, type, total_mb, free_mb, required_mirror_free_mb FROM v$asm_diskgroup"
        cursor.execute(query)
        
        metrics = []
        for row in cursor:
            metrics.append({
                "disk_group": row[0],
                "type": row[1],
                "total_mb": row[2],
                "free_mb": row[3],
                "required_mirror_free_mb": row[4]
            })
        cursor.close()
        return metrics
    
    if __name__ == "__main__":
        conn = connect_db()
        
        # Task 1: Check Storage
        disk_groups = get_storage_metrics(conn)
        for dg in disk_groups:
            print(f"ASM DG {dg['disk_group']} - Free: {dg['free_mb']} MB out of {dg['total_mb']} MB")
            
        # Task 2: Partition Maintenance
        drop_old_partitions(conn)
        
        conn.close()
    
    2. The Ansible Playbook
    Save this as exadata_dba_tasks.yml. This playbook targets the Exadata database nodes, provisions the Python environment, runs the script, and gathers facts. [1, 2]
    yaml
    ---
    - name: Exadata On-Premise DBA Automation
      hosts: exadata_db_nodes
      become: yes
      become_user: oracle
      vars:
        python_script_path: "/home/oracle/scripts/dba_maintenance.py"
        
      tasks:
        - name: Ensure python-oracledb is installed for the oracle user
          ansible.builtin.pip:
            name: oracledb
            state: present
            executable: pip3
            
        - name: Execute the DBA Maintenance Python Script
          ansible.builtin.command:
            cmd: python3 {{ python_script_path }}
          register: script_output
          changed_when: true
          
        - name: Output Python Execution Results
          ansible.builtin.debug:
            msg: "{{ script_output.stdout_lines }}"
    
        - name: Get Exadata Node Facts (OS level)
          ansible.builtin.setup:
            filter: 'ansible_memtotal_mb'
    
    3. Test Cases (for Unit / Integration Testing)
    To ensure the automation works safely against your Exadata environment, set up the following tests:
    Test 1: Connectivity Verification
    • Objective: Ensure the Ansible controller can reach the Exadata compute nodes and verify the Python oracledb library connects to the database.
    • Command: ansible exadata_db_nodes -m ping and python3 -c "import oracledb"
    • Expected Result: Ansible returns "pong". Python command executes without throwing an ImportError.
    Test 2: Idempotency & Error Handling (Partition Drop)
    • Objective: Ensure the script does not crash if partitions have already been dropped.
    • Action: Run dba_maintenance.py twice consecutively.
    • Expected Result: The first run prints "Partition maintenance completed". The second run catches ORA-21402 (handled gracefully by the EXCEPTION block in the Python script) and prints a safe log rather than aborting.
    Test 3: Threshold Alert Simulation
    • Objective: Validate that the ASM storage query returns accurate metrics.
    • Action: Compare the script's get_storage_metrics output against the V$ASM_DISKGROUP view inside SQL*Plus manually.
    • Expected Result: The disk group names, sizes, and free space values returned by the script exactly match the output of your manual SQL query.

    Before executing automation against critical production Exadata cell 




    Question: How to automate monitoring task in exadata on-premises environment using ansible and python 



    To automate Exadata monitoring, use an Ansible Playbook to run a Python script on your Exadata database nodes. The Python script should use the oracledb module to execute SQL health checks, and psutil for hardware telemetry. The playbook then aggregates metrics and handles alerts. 
    1. Python Script: exadata_monitor.py
    This script connects to the Oracle Database, runs basic performance queries, and evaluates critical metrics (like tablespace usage and invalid objects). 
    python
    import oracledb
    import json
    import sys
    
    # Exadata connection details
    DB_USER = 'system'
    DB_PASSWORD = 'YourAdminPassword'
    DB_DSN = '10.0.0.15:1521/PROD' # Replace with SCAN listener and Service Name
    
    def check_database_health():
        try:
            # Connect to Oracle Database
            connection = oracledb.connect(user=DB_USER, password=DB_PASSWORD, dsn=DB_DSN)
            cursor = connection.cursor()
    
            metrics = {}
    
            # 1. Check Tablespace Usage
            cursor.execute("""
                SELECT tablespace_name, ROUND((used_space * block_size) / 1024 / 1024 / 1024, 2) as used_gb 
                FROM dba_tablespace_usage_metrics 
                WHERE (used_space / tablespace_size) > 0.85
            """)
            metrics['tablespace_alerts'] = cursor.fetchall()
    
            # 2. Check Invalid Objects
            cursor.execute("SELECT COUNT(*) FROM dba_objects WHERE status = 'INVALID'")
            metrics['invalid_objects'] = cursor.fetchone()[0]
    
            # 3. Check Active Sessions
            cursor.execute("SELECT COUNT(*) FROM v$session WHERE status = 'ACTIVE' AND type != 'BACKGROUND'")
            metrics['active_sessions'] = cursor.fetchone()[0]
    
            cursor.close()
            connection.close()
    
            print(json.dumps(metrics))
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        check_database_health()
    
    2. Ansible Playbook: monitor_exadata.yml
    Use Ansible to execute the Python script across all Exadata nodes and capture the JSON output. 
    yaml
    ---
    - name: Automate Exadata Health Monitoring
      hosts: exadata_nodes
      gather_facts: yes
      tasks:
        - name: Ensure required python packages are installed
          ansible.builtin.pip:
            name: 
              - oracledb
              - psutil
            state: present
    
        - name: Run Exadata Monitoring Python script
          ansible.builtin.command: python3 /opt/scripts/exadata_monitor.py
          register: script_output
    
        - name: Parse output and display
          ansible.builtin.debug:
            msg: "{{ script_output.stdout | from_json }}"
    
        - name: Trigger alert if thresholds breached
          ansible.builtin.fail:
            msg: "Alert! Exadata performance threshold exceeded. See debug logs."
          when: >
            (script_output.stdout | from_json).invalid_objects > 10 or
            (script_output.stdout | from_json).tablespace_alerts | length > 0
    
    3. Test Cases
    To validate your monitoring automation, you can run the following local unit and integration tests:
    Test 1: Python Connectivity and Output Test
    Purpose: Ensure the Python script successfully connects to the Exadata database and emits valid JSON.
    • Execution Command: python3 exadata_monitor.py
    • Expected Output: {"tablespace_alerts": [], "invalid_objects": 4, "active_sessions": 12} (Must be a valid JSON dictionary).
    Test 2: Ansible Inventory and Syntax Test
    Purpose: Verify that Ansible syntax is correct and your Exadata hosts are reachable.
    • Execution Command: ansible-playbook monitor_exadata.yml --syntax-check
    • Expected Output: playbook: monitor_exadata.yml (No syntax errors). 
    Test 3: Threshold Trigger Integration Test
    Purpose: Verify that the playbook correctly identifies issues and triggers a failure/alert when a metric crosses the critical threshold.
    • Prerequisite: Create a dummy invalid object in the database: CREATE PROCEDURE test_invalid AS BEGIN NULL; END; / ALTER PROCEDURE test_invalid COMPILE BODY; (Introduce syntax error).
    • Execution Command: ansible-playbook monitor_exadata.yml
    • Expected Output: The playbook halts execution and throws a failure (FAILED! => {"msg": "Alert! Exadata performance threshold exceeded..."}). 
    For More Details


    Question : How to automate Tablespace Monitoring in exadata on-premises environment using ansible and python with example and test cases


    Architecture Overview
    [ Ansible Control Node ] 
           │
           ├── (SSH / dcli) ──► [ Exadata Compute Nodes ] ── (Bash/CLI) ──► Grid/CellOS
           │
           └── (Python Script) ──► [ Oracle Database ] ── (SQL/PLSQL) ──► Tablespace Mod
    
    • Ansible: Manages inventory, system checks, and script execution.
    • Python: Connects to the database to check space and run ALTER TABLESPACE.

    Step 1: Python Automation Script (extend_ts.py)
    This script checks if a tablespace is above a threshold and safely extends it. Save this on your Ansible controller or deploy it to the Exadata compute node.
    python
    import sys
    import oracledb
    
    # Database connection parameters
    DB_USER = "sys"
    DB_PASS = "YourSecureSysPassword"
    DB_DSN = "exadata-scan.localdom:1521/ORCL_TAF"
    DB_MODE = oracledb.AUTH_MODE_SYSDBA
    
    def check_and_extend(tablespace_name, threshold_pct, extend_size_mb):
        try:
            # Initialize python-oracledb in thin mode
            connection = oracledb.connect(user=DB_USER, password=DB_PASS, dsn=DB_DSN, mode=DB_MODE)
            cursor = connection.cursor()
    
            # Query space utilization
            query = """
            SELECT df.tablespace_name, 
                   round(((df.bytes - fs.bytes) / df.bytes) * 100, 2) as used_pct
            FROM (SELECT tablespace_name, SUM(bytes) bytes FROM dba_data_files GROUP BY tablespace_name) df,
                 (SELECT tablespace_name, SUM(bytes) bytes FROM dba_free_space GROUP BY tablespace_name) fs
            WHERE df.tablespace_name = fs.tablespace_name AND df.tablespace_name = :ts_name
            """
            cursor.execute(query, ts_name=tablespace_name.upper())
            row = cursor.fetchone()
    
            if not row:
                print(f"Error: Tablespace {tablespace_name} not found.")
                sys.exit(1)
    
            used_pct = row[1]
            print(f"Tablespace {tablespace_name} usage is currently at {used_pct}%.")
    
            if used_pct > float(threshold_pct):
                print(f"Threshold exceeded ({threshold_pct}%). Adding a new datafile...")
                
                # Exadata best practice: Use OMF (Oracle Managed Files) inside ASM disk groups
                alter_query = f"ALTER TABLESPACE {tablespace_name} ADD DATAFILE SIZE {extend_size_mb}M"
                cursor.execute(alter_query)
                
                print(f"Successfully extended {tablespace_name} by {extend_size_mb}MB.")
            else:
                print("Space is sufficient. No action required.")
    
            cursor.close()
            connection.close()
    
        except oracledb.DatabaseError as e:
            error, = e.args
            print(f"Database error occurred: {error.message}")
            sys.exit(1)
    
    if __name__ == "__main__":
        if len(sys.argv) < 4:
            print("Usage: python extend_ts.py <TS_NAME> <THRESHOLD_PCT> <EXTEND_SIZE_MB>")
            sys.exit(1)
        check_and_extend(sys.argv[1], sys.argv[2], sys.argv[3])
    
     Step 2: Ansible Playbook (exadata_dba_tasks.yml)
    This playbook creates a backup directory, updates the local Oracle environment variables, runs pre-checks, and executes the Python script. 
    yaml
    ---
    - name: Exadata Automated DBA Tablespace Management
      hosts: exadata_compute_nodes
      become: yes
      become_user: oracle
      vars:
        oracle_home: "/u01/app/oracle/product/19.0.0/dbhome_1"
        oracle_sid: "ORCL1"
        target_tablespace: "APP_DATA"
        threshold: 85
        increment_mb: 5120 # 5GB
    
      tasks:
        - name: Pre-check | Verify Oracle Environment
          ansible.builtin.stat:
            path: "{{ oracle_home }}/bin/sqlplus"
          register: sqlplus_file
    
        - name: Fail if Oracle Home is invalid
          ansible.builtin.fail:
            msg: "Oracle Home path is incorrect on this Exadata node."
          when: not sqlplus_file.stat.exists
    
        - name: Execution | Run Python Tablespace Extension Script
          ansible.builtin.command:
            cmd: "python3 /u01/app/oracle/scripts/extend_ts.py {{ target_tablespace }} {{ threshold }} {{ increment_mb }}"
          environment:
            ORACLE_HOME: "{{ oracle_home }}"
            ORACLE_SID: "{{ oracle_sid }}"
            PATH: "{{ oracle_home }}/bin:{{ ansible_env.PATH }}"
          register: script_output
          changed_when: "'Successfully extended' in script_output.stdout"
    
        - name: Output | Log results to console
          ansible.builtin.debug:
            var: script_output.stdout_lines
    
     Step 3: Test Cases & Validation Framework
    To ensure the automation code is robust before executing against Exadata production, run these automated tests inside a lower environment (Dev/Test).
    1. Unit Test File: Mocking Database Responses (test_extend_ts.py)
    Use Python's unittest.mock framework to test your Python logic without actually connecting to an Exadata machine.
    python
    import unittest
    from unittest.mock import patch, MagicMock
    import extend_ts
    
    class TestDBAAutomation(unittest.TestCase):
    
        @patch('oracledb.connect')
        def test_no_action_needed(self, mock_connect):
            # Mocking SQL return value: Used Pct = 50%
            mock_cursor = MagicMock()
            mock_cursor.fetchone.return_value = ('APP_DATA', 50.0)
            mock_connect.return_value.cursor.return_value = mock_cursor
    
            with patch('sys.exit') as mock_exit:
                extend_ts.check_and_extend('APP_DATA', 85, 100)
                # Ensure ALTER TABLESPACE was NOT called
                mock_cursor.execute.assert_called_once() # Only the SELECT query ran
                
        @patch('oracledb.connect')
        def test_tablespace_extension_triggered(self, mock_connect):
            # Mocking SQL return value: Used Pct = 90% (Threshold is 85)
            mock_cursor = MagicMock()
            mock_cursor.fetchone.return_value = ('APP_DATA', 90.0)
            mock_connect.return_value.cursor.return_value = mock_cursor
    
            extend_ts.check_and_extend('APP_DATA', 85, 5120)
            # Verify that the ALTER statement was triggered
            mock_cursor.execute.assert_any_call("ALTER TABLESPACE APP_DATA ADD DATAFILE SIZE 5120M")
    
    if __name__ == '__main__':
        unittest.main()
    
    2. Ansible Dry-Run (Check Mode)
    Before making live changes to Exadata storage, run Ansible in check mode to validate variables, access connectivity, and task sequences: 
    bash
    ansible-playbook -i hosts exadata_dba_tasks.yml --check
    
    3. Infrastructure Negative Integration Tests
    Intentionally force failure scenarios to evaluate script resiliency:
    Test Scenario ActionExpected Behavior
    Invalid Tablespace NamePass WRONG_TS to the playbookScript outputs Tablespace WRONG_TS not found and exits cleanly with code 1. Ansible marks task as failed.
    ASM Diskgroup FullRun when DATA diskgroup is at 100%Oracle yields ORA-01114 / ORA-15041. Python catches DatabaseError, logs the exact ORA error, and exits without crashing.
    Node DownPower off one Compute NodeAnsible targeting that specific host fails gracefully at the setup/gathering facts phase; other nodes continue.

    No comments:

    Post a Comment