Monday, October 20, 2025

data guard

-- Generates DDL to enable constraints with NOVALIDATE for specified schemas
SELECT
    -- Order foreign keys ('R') last to minimize dependency issues
    CASE WHEN constraint_type = 'R' THEN 2 ELSE 1 END AS sort_order,
    -- Construct DDL statement with quoted identifiers for safety
    'ALTER TABLE "' || owner || '"."' || table_name || 
    '" ENABLE NOVALIDATE CONSTRAINT "' || constraint_name || '";' AS ddl_statement
FROM
    dba_constraints
WHERE
    -- Target specified schemas
    owner IN ('SCHEMA_A', 'SCHEMA_B', 'SCHEMA_C', 'SCHEMA_D', 'SCHEMA_E', 'SCHEMA_F', 'SCHEMA_G')
    -- Exclude constraints already in ENABLED NOVALIDATE state
    AND NOT (status = 'ENABLED' AND validated = 'NOT VALIDATED')
    -- Include only primary key, unique, foreign key, and check constraints
    AND constraint_type IN ('P', 'U', 'R', 'C')
    -- Exclude system-generated constraints to avoid ORA-31603
    AND constraint_name NOT LIKE 'SYS_C%'
ORDER BY
    sort_order, table_name, constraint_name;


-


- File: precheck_sequences.sql
SET LINESIZE 4000
SET PAGESIZE 0
SPOOL precheck_sequences.txt
SELECT 
  sequence_owner AS schema_name,
  COUNT(*) AS sequence_count,
  LISTAGG(sequence_name, ', ') WITHIN GROUP (ORDER BY sequence_name) AS sequence_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')  -- Replace with your 7 schemas
GROUP BY sequence_owner
UNION ALL
SELECT 
  'TOTAL' AS schema_name,
  COUNT(*) AS sequence_count,
  NULL AS sequence_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
ORDER BY schema_name;
SPOOL OFF

-- File: generate_sequences_increment_by_1.sql
SET LINESIZE 4000
SET PAGESIZE 0
SPOOL create_sequences_increment_by_1.sql
SELECT 
  'CREATE SEQUENCE ' || sequence_owner || '.' || sequence_name || 
  ' START WITH ' || last_number || 
  ' INCREMENT BY 1' || 
  CASE WHEN max_value IS NULL THEN ' NOMAXVALUE' ELSE ' MAXVALUE ' || max_value END || 
  CASE WHEN min_value IS NULL THEN ' NOMINVALUE' ELSE ' MINVALUE ' || min_value END || 
  CASE WHEN cycle_flag = 'Y' THEN ' CYCLE' ELSE ' NOCYCLE' END || 
  CASE WHEN cache_size = 0 THEN ' NOCACHE' ELSE ' CACHE ' || cache_size END || 
  CASE WHEN order_flag = 'Y' THEN ' ORDER' ELSE ' NOORDER' END || 
  CASE WHEN keep_value = 'N' THEN ' NOKEEP' ELSE ' KEEP' END || 
  CASE WHEN scale_flag = 'N' THEN ' NOSCALE' WHEN scale_flag = 'Y' THEN ' SCALE' ELSE '' END || 
  CASE WHEN session_flag = 'N' THEN ' GLOBAL' WHEN session_flag = 'Y' THEN ' SESSION' ELSE '' END || ';' AS create_statement
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')  -- Replace with your 7 schemas
ORDER BY sequence_owner, sequence_name;
SPOOL OFF


-- File: generate_sequences_preserve_all.sql
SET LINESIZE 4000
SET PAGESIZE 0
SPOOL create_sequences_preserve_all.sql
SELECT 
  'CREATE SEQUENCE ' || sequence_owner || '.' || sequence_name || 
  ' START WITH ' || last_number || 
  ' INCREMENT BY ' || increment_by || 
  CASE WHEN max_value IS NULL THEN ' NOMAXVALUE' ELSE ' MAXVALUE ' || max_value END || 
  CASE WHEN min_value IS NULL THEN ' NOMINVALUE' ELSE ' MINVALUE ' || min_value END || 
  CASE WHEN cycle_flag = 'Y' THEN ' CYCLE' ELSE ' NOCYCLE' END || 
  CASE WHEN cache_size = 0 THEN ' NOCACHE' ELSE ' CACHE ' || cache_size END || 
  CASE WHEN order_flag = 'Y' THEN ' ORDER' ELSE ' NOORDER' END || 
  CASE WHEN keep_value = 'N' THEN ' NOKEEP' ELSE ' KEEP' END || 
  CASE WHEN scale_flag = 'N' THEN ' NOSCALE' WHEN scale_flag = 'Y' THEN ' SCALE' ELSE '' END || 
  CASE WHEN session_flag = 'N' THEN ' GLOBAL' WHEN session_flag = 'Y' THEN ' SESSION' ELSE '' END || ';' AS create_statement
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')  -- Replace with your 7 schemas
ORDER BY sequence_owner, sequence_name;
SPOOL OFF

-- File: postcheck_sequences.sql
SET LINESIZE 4000
SET PAGESIZE 0
SPOOL postcheck_sequences.txt
SELECT 
  sequence_owner AS schema_name,
  COUNT(*) AS sequence_count,
  LISTAGG(sequence_name, ', ') WITHIN GROUP (ORDER BY sequence_name) AS sequence_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')  -- Replace with your 7 schemas
GROUP BY sequence_owner
UNION ALL
SELECT 
  'TOTAL' AS schema_name,
  COUNT(*) AS sequence_count,
  NULL AS sequence_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
ORDER BY schema_name;
SPOOL OFF



===

SET LINESIZE 4000
SET PAGESIZE 0
SPOOL precheck_sequences_and_constraints.txt
-- Sequences Pre-Check
SELECT 
  'SEQUENCES,' || sequence_owner AS schema_name,
  COUNT(*) AS object_count,
  LISTAGG(sequence_name, ', ') WITHIN GROUP (ORDER BY sequence_name) AS object_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')  -- Replace with your 7 schemas
GROUP BY sequence_owner
UNION ALL
SELECT 
  'SEQUENCES,TOTAL' AS schema_name,
  COUNT(*) AS object_count,
  NULL AS object_names
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
UNION ALL
-- Constraints Pre-Check
SELECT 
  'CONSTRAINTS,' || owner AS schema_name,
  COUNT(*) AS object_count,
  LISTAGG(constraint_name || ' (' || constraint_type || ')', ', ') WITHIN GROUP (ORDER BY constraint_name) AS object_names
FROM all_constraints
WHERE owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
  AND constraint_type IN ('P', 'U', 'R', 'C')  -- Primary, Unique, Foreign, Check
GROUP BY owner
UNION ALL
SELECT 
  'CONSTRAINTS,TOTAL' AS schema_name,
  COUNT(*) AS object_count,
  NULL AS object_names
FROM all_constraints
WHERE owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
  AND constraint_type IN ('P', 'U', 'R', 'C')
ORDER BY schema_name;
SPOOL OFF



SET LINESIZE 4000
SET PAGESIZE 0
SPOOL postcheck_sequences_and_constraints.txt
-- Sequence Counts and Names
SELECT 
  'SEQUENCES,' || sequence_owner AS schema_name,
  COUNT(*) AS object_count
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
GROUP BY sequence_owner
UNION ALL
SELECT 
  'SEQUENCES,TOTAL' AS schema_name,
  COUNT(*) AS object_count
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
UNION ALL
SELECT 
  'SEQUENCES,' || sequence_owner AS schema_name,
  sequence_name AS object_count
FROM all_sequences
WHERE sequence_owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
UNION ALL
-- Constraint Counts and Names
SELECT 
  'CONSTRAINTS,' || owner AS schema_name,
  COUNT(*) AS object_count
FROM all_constraints
WHERE owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
  AND constraint_type IN ('P', 'U', 'R', 'C')
GROUP BY owner
UNION ALL
SELECT 
  'CONSTRAINTS,TOTAL' AS schema_name,
  COUNT(*) AS object_count
FROM all_constraints
WHERE owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
  AND constraint_type IN ('P', 'U', 'R', 'C')
UNION ALL
SELECT 
  'CONSTRAINTS,' || owner AS schema_name,
  constraint_name || ' (' || constraint_type || ')' AS object_count
FROM all_constraints
WHERE owner IN ('SCHEMA1', 'SCHEMA2', 'SCHEMA3', 'SCHEMA4', 'SCHEMA5', 'SCHEMA6', 'SCHEMA7')
  AND constraint_type IN ('P', 'U', 'R', 'C')
ORDER BY schema_name, object_count;
SPOOL OFF

=============

DECLARE
  CURSOR csr IS
    SELECT sequence_name,
           sequence_owner AS schema_name,
           last_number,
           min_value,
           max_value,
           increment_by,
           cycle_flag,
           order_flag,
           cache_size,
           keep_value,
           scale_flag,
           session_flag
    FROM all_sequences
    WHERE sequence_owner = 'YOUR_SCHEMA';  -- Replace with your source schema
  v_total_sequences NUMBER;
BEGIN
  -- Count total sequences for validation
  SELECT COUNT(*) INTO v_total_sequences
  FROM all_sequences
  WHERE sequence_owner = 'YOUR_SCHEMA';
  
  -- Output header for validation report
  DBMS_OUTPUT.PUT_LINE('Total sequences found in schema ' || 'YOUR_SCHEMA' || ': ' || v_total_sequences);
  DBMS_OUTPUT.PUT_LINE('Schema | Sequence Name | Start With | Increment By | Min Value | Max Value | Cycle | Order | Cache | Keep | Scale | Session | Create Statement');
  DBMS_OUTPUT.PUT_LINE('-------|---------------|------------|--------------|-----------|-----------|-------|-------|-------|------|-------|--------|-----------------');
  
  FOR rec IN csr LOOP
    -- Output validation data
    DBMS_OUTPUT.PUT_LINE(
      rec.schema_name || ' | ' || 
      rec.sequence_name || ' | ' || 
      rec.last_number || ' | ' || 
      '1' || ' | ' || 
      NVL(TO_CHAR(rec.min_value), 'NULL') || ' | ' || 
      NVL(TO_CHAR(rec.max_value), 'NULL') || ' | ' || 
      rec.cycle_flag || ' | ' || 
      rec.order_flag || ' | ' || 
      rec.cache_size || ' | ' || 
      rec.keep_value || ' | ' || 
      NVL(rec.scale_flag, 'N/A') || ' | ' || 
      rec.session_flag || ' | ' ||
      'CREATE SEQUENCE ' || rec.schema_name || '.' || rec.sequence_name || 
      ' START WITH ' || rec.last_number || 
      ' INCREMENT BY 1' || 
      CASE WHEN rec.max_value IS NULL THEN ' NOMAXVALUE' ELSE ' MAXVALUE ' || rec.max_value END || 
      CASE WHEN rec.min_value IS NULL THEN ' NOMINVALUE' ELSE ' MINVALUE ' || rec.min_value END || 
      CASE WHEN rec.cycle_flag = 'Y' THEN ' CYCLE' ELSE ' NOCYCLE' END || 
      CASE WHEN rec.cache_size = 0 THEN ' NOCACHE' ELSE ' CACHE ' || rec.cache_size END || 
      CASE WHEN rec.order_flag = 'Y' THEN ' ORDER' ELSE ' NOORDER' END || 
      CASE WHEN rec.keep_value = 'N' THEN ' NOKEEP' ELSE ' KEEP' END || 
      CASE WHEN rec.scale_flag = 'N' THEN ' NOSCALE' WHEN rec.scale_flag = 'Y' THEN ' SCALE' ELSE '' END || 
      CASE WHEN rec.session_flag = 'N' THEN ' GLOBAL' WHEN rec.session_flag = 'Y' THEN ' SESSION' ELSE '' END || ';'
    );
  END LOOP;
  
  -- Final instruction
  DBMS_OUTPUT.PUT_LINE(' ');
  DBMS_OUTPUT.PUT_LINE('Copy the CREATE SEQUENCE statements above and execute them in the target database.');
  DBMS_OUTPUT.PUT_LINE('Ensure the schema ' || '''YOUR_SCHEMA''' || ' exists in the target database.');
  DBMS_OUTPUT.PUT_LINE('Verify that ' || v_total_sequences || ' sequences are created in the target database.');
EXCEPTION
  WHEN OTHERS THEN
    DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
    RAISE;
END;
/
=======

SELECT 
    d.name AS db_name,
    d.db_unique_name,
    d.open_mode,
    CASE 
        WHEN d.database_role = 'PRIMARY' THEN 'PRIMARY'
        WHEN d.database_role = 'PHYSICAL STANDBY' THEN 'PHYSICAL STANDBY'
        WHEN d.database_role = 'LOGICAL STANDBY' THEN 'LOGICAL STANDBY'
        WHEN d.database_role = 'SNAPSHOT STANDBY' THEN 'SNAPSHOT STANDBY'
        ELSE d.database_role
    END AS database_role,
    CASE 
        WHEN d.protection_mode = 'MAXIMUM PROTECTION' THEN 'MAX PROTECTION'
        WHEN d.protection_mode = 'MAXIMUM AVAILABILITY' THEN 'MAX AVAILABILITY'
        WHEN d.protection_mode = 'MAXIMUM PERFORMANCE' THEN 'MAX PERFORMANCE'
        ELSE d.protection_mode
    END AS protection_mode,
    d.protection_level,
    d.remote_archive,
    d.flashback_on,
    d.log_mode,
    CASE WHEN dr.status = 'VALID' THEN 'APPLYING' ELSE dr.status END AS apply_status,
    dr.process AS apply_process,
    dr.error AS last_error
FROM v$database d
LEFT JOIN v$dataguard_status dr ON 1=1
WHERE ROWNUM = 1;

SELECT 
    T.tablespace_name,
    -- Total Size
    TO_CHAR(T.total_blocks * B.block_size / 1024 / 1024, '999,999.00') AS "Total Size (MB)",
    -- Used Size
    TO_CHAR(SUM(S.used_blocks * B.block_size) / 1024 / 1024, '999,999.00') AS "Used Size (MB)",
    -- Free Size (Total - Used)
    TO_CHAR((T.total_blocks * B.block_size - SUM(S.used_blocks * B.block_size)) / 1024 / 1024, '999,999.00') AS "Free Size (MB)",
    -- Used Percentage
    TO_CHAR(ROUND(SUM(S.used_blocks) / T.total_blocks * 100, 2), '999.00') AS "Used (%)"
FROM 
    V$SORT_SEGMENT S,
    DBA_TABLESPACES B,
    (
        SELECT 
            tablespace_name, 
            SUM(blocks) AS total_blocks 
        FROM DBA_TEMP_FILES 
        GROUP BY tablespace_name
    ) T
WHERE 
    S.tablespace_name(+) = T.tablespace_name 
    AND T.tablespace_name = B.tablespace_name
GROUP BY 
    T.tablespace_name, T.total_blocks, B.block_size
ORDER BY "Used (%)" DESC;


SELECT 
    S.sid, 
    S.serial#, 
    S.username, 
    S.osuser,
    S.program, 
    T.blocks * B.block_size / 1024 / 1024 AS "MB Used",
    T.tablespace
FROM 
    V$TEMPSEG_USAGE T, 
    V$SESSION S, 
    DBA_TABLESPACES B
WHERE 
    T.session_addr = S.saddr
    AND T.tablespace = B.tablespace_name
ORDER BY 
    "MB Used" DESC;


SELECT 
    ARCH.THREAD# AS "Thread",
    ARCH.SEQUENCE# AS "Last Received", 
    APPL.SEQUENCE# AS "Last Applied", 
    (ARCH.SEQUENCE# - APPL.SEQUENCE#) AS "Seq Diff",
    ROUND((SYSDATE - MAX(ARCH.FIRST_TIME)) * 1440, 1) AS "Time Lag (min)",
    CASE 
        WHEN (ARCH.SEQUENCE# - APPL.SEQUENCE#) > 10 THEN ' CRITICAL'
        WHEN (ARCH.SEQUENCE# - APPL.SEQUENCE#) > 5 THEN ' HIGH'
        ELSE 'OK'
    END AS "Status"
FROM 
    (SELECT THREAD#, SEQUENCE#, FIRST_TIME
     FROM V$ARCHIVED_LOG 
     WHERE (THREAD#, FIRST_TIME) IN (
         SELECT THREAD#, MAX(FIRST_TIME) 
         FROM V$ARCHIVED_LOG 
         GROUP BY THREAD#
     )
    ) ARCH,
    (SELECT THREAD#, SEQUENCE#
     FROM V$LOG_HISTORY 
     WHERE (THREAD#, FIRST_TIME) IN (
         SELECT THREAD#, MAX(FIRST_TIME) 
         FROM V$LOG_HISTORY 
         GROUP BY THREAD#
     )
    ) APPL
WHERE ARCH.THREAD# = APPL.THREAD#
GROUP BY ARCH.THREAD#, ARCH.SEQUENCE#, APPL.SEQUENCE#, ARCH.FIRST_TIME
ORDER BY 1;

Sunday, October 19, 2025

Validation

 


SELECT
    owner,
    table_name,
    TO_CHAR(num_rows, '999,999,999,999') AS dictionary_row_estimate,
    TO_CHAR(data_mb, '999,999.00') AS size_mb
FROM
    (
        SELECT
            t.owner,
            t.table_name,
            t.num_rows,
            (s.bytes / 1024 / 1024) AS data_mb
        FROM
            all_tables t
        JOIN
            dba_segments s ON t.owner = s.owner AND t.table_name = s.segment_name
        WHERE
            t.owner IN ('HR', 'SALES', 'FINANCE') -- 👈 ADJUST SCHEMAS IF NECESSARY
            AND t.table_name NOT LIKE 'BIN$%'
            AND s.segment_type = 'TABLE'
        ORDER BY
            t.num_rows DESC
    )
WHERE
    ROWNUM <= 10;


SELECT
    'Verified Top 10 Report' AS report_title,
    t.owner AS schema_name,
    t.table_name,
    -- 1. Dictionary Estimate (Often Stale)
    TO_CHAR(t.num_rows, '999,999,999,999') AS dictionary_estimate,
    
    -- 2. Your Verified Live Count (From the parallel system)
    TO_CHAR(l.row_count, '999,999,999,999') AS verified_live_count,
    
    -- 3. Status and Timestamp
    c.status AS final_status,
    TO_CHAR(l.count_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS verification_time
    
FROM
    all_tables t
JOIN
    dba_segments s ON t.owner = s.owner AND t.table_name = s.segment_name
LEFT JOIN
    row_count_control c ON t.owner = c.schema_name AND t.table_name = c.table_name
LEFT JOIN
    row_counts_scheduler_log l ON t.owner = l.schema_name AND t.table_name = l.table_name
WHERE
    t.owner IN ('HR', 'SALES', 'FINANCE')
    AND s.segment_type = 'TABLE'
ORDER BY
    t.num_rows DESC -- Keep the largest tables at the top
FETCH NEXT 10 ROWS ONLY;

SELECT
    '3. Overall Project Completion' AS report_title,
    successful_count AS tables_processed_successfully,
    failed_count AS tables_requiring_dba_fix,
    pct_complete || '%' AS completion_rate
FROM
    row_count_summary;


=====

SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE

DECLARE
    -- Variables to hold data from cursors for output formatting
    v_output_line VARCHAR2(500);
    v_total_tasks NUMBER;
    v_pct_complete NUMBER;
    
    -- Cursor for Query 2 (Per-Schema Breakdown)
    CURSOR c_schema_status IS
        SELECT
            schema_name,
            SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS done_count,
            SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
            ROUND(SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / COUNT(*) * 100, 1) AS pct_done
        FROM
            row_count_control
        GROUP BY
            schema_name
        ORDER BY
            pct_done DESC;
            
    -- Cursor for Query 4 (Specific Table Lookup/Validation Example)
    CURSOR c_validation_check IS
        SELECT
            t.owner AS schema_name,
            t.table_name,
            TO_CHAR(t.num_rows, '999,999,999,999') AS estimate,
            TO_CHAR(l.row_count, '999,999,999,999') AS live_count,
            c.status AS final_status
        FROM
            all_tables t
        LEFT JOIN
            row_count_control c ON t.owner = c.schema_name AND t.table_name = c.table_name
        LEFT JOIN
            row_counts_scheduler_log l ON t.owner = l.schema_name AND t.table_name = l.table_name
        WHERE
            t.owner IN ('HR', 'SALES', 'FINANCE')
            -- Limit to 10 rows for a quick validation snapshot
            AND ROWNUM <= 10
        ORDER BY
            t.table_name; 

BEGIN
    DBMS_OUTPUT.PUT_LINE(CHR(10) || '========================================================================');
    DBMS_OUTPUT.PUT_LINE('  I. OVERALL SYSTEM HEALTH DASHBOARD');
    DBMS_OUTPUT.PUT_LINE('========================================================================');
    
    -- Query 1: Overall Dashboard (Using the View)
    SELECT total_tasks, pct_complete 
    INTO v_total_tasks, v_pct_complete
    FROM row_count_summary;

    DBMS_OUTPUT.PUT_LINE(RPAD('Total Workload Tasks:', 30) || TO_CHAR(v_total_tasks, '999,999'));
    DBMS_OUTPUT.PUT_LINE(RPAD('Overall Completion Rate:', 30) || TO_CHAR(v_pct_complete, '990.0') || '%');
    
    -- Query the view for full status line (simpler than fetching 8 columns into variables)
    FOR r IN (SELECT * FROM row_count_summary) LOOP
        DBMS_OUTPUT.PUT_LINE(RPAD('  Success Count:', 30) || TO_CHAR(r.success_count, '999,999'));
        DBMS_OUTPUT.PUT_LINE(RPAD('  Pending Count (Remaining):', 30) || TO_CHAR(r.pending_count, '999,999'));
        DBMS_OUTPUT.PUT_LINE(RPAD('  Failed Count (Requires DBA):', 30) || TO_CHAR(r.failed_count, '999'));
        DBMS_OUTPUT.PUT_LINE(RPAD('  Last Completion Time:', 30) || TO_CHAR(r.last_completion_time, 'HH24:MI:SS'));
    END LOOP;

    DBMS_OUTPUT.PUT_LINE(CHR(10) || '========================================================================');
    DBMS_OUTPUT.PUT_LINE('  II. STATUS BREAKDOWN PER SCHEMA');
    DBMS_OUTPUT.PUT_LINE('========================================================================');

    -- Query 2: Status Breakdown Per Schema
    DBMS_OUTPUT.PUT_LINE(RPAD('SCHEMA', 15) || RPAD('TOTAL_TABLES', 15) || RPAD('COMPLETE', 10) || RPAD('FAILED', 10) || RPAD('%_DONE', 8));
    DBMS_OUTPUT.PUT_LINE(RPAD('-', 58, '-'));

    FOR r_schema IN c_schema_status LOOP
        DBMS_OUTPUT.PUT_LINE(
            RPAD(r_schema.schema_name, 15) || 
            RPAD(TO_CHAR(r_schema.done_count + r_schema.failed_count + r_schema.pending_count, '999,999'), 15) || 
            RPAD(TO_CHAR(r_schema.done_count, '999,999'), 10) ||
            RPAD(TO_CHAR(r_schema.failed_count, '999'), 10) ||
            RPAD(TO_CHAR(r_schema.pct_done, '990.0'), 8)
        );
    END LOOP;

    DBMS_OUTPUT.PUT_LINE(CHR(10) || '========================================================================');
    DBMS_OUTPUT.PUT_LINE('  III. THREE-WAY TABLE COUNT VALIDATION (SAMPLE)');
    DBMS_OUTPUT.PUT_LINE('========================================================================');
    DBMS_OUTPUT.PUT_LINE(RPAD('SCHEMA.TABLE', 30) || RPAD('ALL_TABLES (EST)', 20) || RPAD('LIVE_COUNT', 20) || RPAD('STATUS', 10));
    DBMS_OUTPUT.PUT_LINE(RPAD('-', 80, '-'));

    -- Query 4: Detailed Table Validation
    FOR r_val IN c_validation_check LOOP
        DBMS_OUTPUT.PUT_LINE(
            RPAD(r_val.schema_name || '.' || r_val.table_name, 30) ||
            RPAD(r_val.estimate, 20) ||
            RPAD(r_val.live_count, 20) ||
            RPAD(r_val.final_status, 10)
        );
    END LOOP;
    
    DBMS_OUTPUT.PUT_LINE(CHR(10) || '--- END OF MONITORING REPORT ---');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR DURING MONITORING !!! SQLERRM: ' || SQLERRM);
        -- Note: We don't rollback here as this is a read-only process
END;
/

============

1. Overall System Health Dashboard (The Quick Status)

This query gives you the overall progress, success rate, and active job counts from the summary view.

SQL
SELECT
    '1. OVERALL SYSTEM STATUS' AS metric_group,
    t.total_tasks,
    t.successful_count AS tasks_complete,
    t.pending_count AS tasks_waiting,
    t.running_count AS tasks_active,
    t.failed_count AS tasks_failed,
    t.pct_complete || '%' AS completion_rate,
    TO_CHAR(t.last_completion_time, 'YYYY-MM-DD HH24:MI:SS') AS last_success_time
FROM
    row_count_summary t;

2. Status Breakdown Per Schema (Workload Health)

This query uses the row_count_control table to show which schemas are complete, failed, or still pending work.

SQL
SELECT
    schema_name,
    COUNT(*) AS total_tables,
    SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS done_count,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
    SUM(CASE WHEN status = 'PENDING' THEN 1 ELSE 0 END) AS pending_count,
    ROUND(SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / COUNT(*) * 100, 1) AS pct_done
FROM
    row_count_control
GROUP BY
    schema_name
ORDER BY
    pct_done DESC, schema_name;

3. Detailed Failed Task Report (Troubleshooting)

This query pinpoints every task currently marked as FAILED and shows the exact error message that caused the stall.

SQL
SELECT
    '3. FAILED TASK REPORT' AS metric_group,
    job_partition_id AS batch_id,
    schema_name,
    table_name,
    error_message,
    TO_CHAR(end_time, 'YYYY-MM-DD HH24:MI:SS') AS failure_time
FROM
    row_count_control
WHERE
    status = 'FAILED'
ORDER BY
    end_time DESC;

4. Three-Way Table Count Validation (Accuracy Check)

This query proves the integrity of your system by comparing Oracle's stale estimate (ALL_TABLES) against your verified, live count (row_counts_scheduler_log) for a sample of tables.

SQL
SELECT
    '4. LIVE COUNT VALIDATION (SAMPLE)' AS metric_group,
    c.schema_name,
    c.table_name,
    -- 1. Source Estimate (Unverified)
    TO_CHAR(t.num_rows, '999,999,999,999') AS all_tables_estimate,
    -- 2. Your Verified Live Count (Final Result)
    TO_CHAR(l.row_count, '999,999,999,999') AS log_table_live_count,
    -- 3. Control Status
    c.status AS final_control_status
FROM
    all_tables t
LEFT JOIN
    row_count_control c ON t.owner = c.schema_name AND t.table_name = c.table_name
LEFT JOIN
    row_counts_scheduler_log l ON t.owner = l.schema_name AND t.table_name = l.table_name
WHERE
    c.status IN ('COMPLETE', 'PENDING', 'RUNNING') -- Focus on tasks in the control queue
ORDER BY
    c.status DESC, c.schema_name, c.table_name
FETCH NEXT 10 ROWS ONLY; -- Adjust ROWNUM limit as needed

test

SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE

DECLARE
    -- Configuration variables
    c_batch_size        CONSTANT NUMBER := 40;  
    c_job_prefix        CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';

    -- Runtime Variables
    v_total_source_count NUMBER;
    v_max_batches_needed NUMBER; -- 🌟 FIX: Holds the actual highest partition ID
    v_job_name           VARCHAR2(128);
    v_ddl_statement      VARCHAR2(4000);
    v_total_submitted    NUMBER := 0;
    v_pending_in_batch   NUMBER; 

    -- Helper procedure to execute DDL safely (defined to avoid compile errors on missing objects)
    PROCEDURE execute_ddl(p_sql IN VARCHAR2) IS
    BEGIN
        EXECUTE IMMEDIATE p_sql;
    EXCEPTION
        WHEN OTHERS THEN
            -- Ignore common errors (including ORA-27475: unknown job)
            IF SQLCODE NOT IN (-942, -4043, -27475) THEN
                RAISE; 
            END IF;
    END;
    
    -- Helper function to check for object existence
    FUNCTION object_exists(p_object_name IN VARCHAR2) RETURN BOOLEAN IS
        v_exists NUMBER;
    BEGIN
        SELECT COUNT(*) INTO v_exists FROM user_objects WHERE object_name = UPPER(p_object_name);
        RETURN v_exists > 0;
    END;
    
    -- P1. Initializes DDL and Structural Integrity
    PROCEDURE initialize_system IS
    BEGIN
        DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: PERFORMING DESTRUCTIVE FIRST-TIME SETUP ---');
        
        -- Cleanup Phase
        execute_ddl('DROP VIEW row_count_summary');
        execute_ddl('DROP TABLE row_count_control CASCADE CONSTRAINTS');
        execute_ddl('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS'); 
        execute_ddl('DROP TABLE job_staging_gtt');

        -- Creation Phase (Omitting DDL body for brevity, assume correct)
        EXECUTE IMMEDIATE '
            CREATE TABLE row_count_control (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
                start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
                CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE TABLE row_counts_scheduler_log (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
                batch_id NUMBER, error_message VARCHAR2(4000),
                CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
                owner VARCHAR2(128), table_name VARCHAR2(128)
            ) ON COMMIT PRESERVE ROWS';
            
        -- Create Summary View
        v_ddl_statement := '
            CREATE OR REPLACE VIEW row_count_summary AS
            SELECT 
                COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
                SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
                ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
                MAX(end_time) AS last_completion 
            FROM row_count_control';
        EXECUTE IMMEDIATE v_ddl_statement;
        
        COMMIT;
        DBMS_OUTPUT.PUT_LINE(' -> SYSTEM DDL SETUP COMPLETE.');
    END initialize_system;

    -- P2. Populates the Master Control Table using NTILE
    PROCEDURE prepare_workload IS
        v_total_count NUMBER;
        v_ddl_insert VARCHAR2(4000);
    BEGIN
        SELECT COUNT(*) INTO v_total_count 
        FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';

        v_batches_needed := CEIL(v_total_count / c_batch_size);

        -- CRITICAL: MERGE is used to add only new tables, preserving existing statuses
        v_ddl_insert := '
            MERGE INTO row_count_control c
            USING (
                SELECT owner AS schema_name, table_name,
                        NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
                FROM all_tables
                WHERE owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''
            ) s
            ON (c.schema_name = s.schema_name AND c.table_name = s.table_name)
            WHEN NOT MATCHED THEN
                INSERT (schema_name, table_name, job_partition_id, status)
                VALUES (s.schema_name, s.table_name, s.job_partition_id, ''PENDING'')';
                
        EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
        COMMIT;
        DBMS_OUTPUT.PUT_LINE(' -> WORKLOAD PREPARED. ' || SQL%ROWCOUNT || ' new tasks added/checked. Total partitions: ' || v_batches_needed);
    END prepare_workload;
    
    -- P3. Clears abandoned 'RUNNING' tasks
    PROCEDURE reset_stalled_tasks IS
        v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    BEGIN
        UPDATE row_count_control
        SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
        WHERE status = 'RUNNING'
          AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
        COMMIT;
    END reset_stalled_tasks;
    
    -- P4. Creates the core counting procedure
    PROCEDURE create_counting_procedure IS
    BEGIN
        v_ddl_statement := '
            CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
                v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
                v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000); 
                CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
            BEGIN
                -- Phase 1: STAGE WORKLOAD (Isolation and Resumption)
                EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
                v_sql_staging_filter := ''
                    INSERT INTO job_staging_gtt (owner, table_name)
                    SELECT schema_name, table_name
                    FROM row_count_control
                    WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
                    '';
                EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id;
                COMMIT; 

                -- Phase 2: PROCESS STAGED WORKLOAD
                FOR r IN c_staged_tables LOOP
                    BEGIN 
                        -- Update status to RUNNING immediately (for stall detection)
                        UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                        v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
                        EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
                        
                        -- LOG SUCCESS and Mark COMPLETE
                        DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
                        INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                        VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);

                        UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                    EXCEPTION 
                        WHEN OTHERS THEN 
                            v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                            -- Mark task as FAILED (prevents automatic retries until DBA intervenes)
                            UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
                            WHERE schema_name = r.owner AND table_name = r.table_name;
                            INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                            VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                            COMMIT; 
                    END;
                END LOOP;
            END;
            ';
        EXECUTE IMMEDIATE v_ddl_statement;
        DBMS_OUTPUT.PUT_LINE(' -> Counting Procedure created.');
    END create_counting_procedure;


BEGIN
    --------------------------------------------------------------------------------------------------
    -- 0. INITIAL LAUNCH OR RESUME CHECK
    --------------------------------------------------------------------------------------------------
    IF NOT object_exists('ROW_COUNT_CONTROL') THEN
        DBMS_OUTPUT.PUT_LINE('--- PHASE 1: FULL SYSTEM INITIALIZATION (FIRST RUN) ---');
        initialize_system; 
        prepare_workload; -- Populates the queue for the very first time
    ELSE
        DBMS_OUTPUT.PUT_LINE('--- PHASE 1: SYSTEM IS LIVE. INITIATING RESTART/RESUME PROTOCOL ---');
        prepare_workload; -- Check for new schemas/tables added since last run
    END IF;
    
    create_counting_procedure; -- Ensure procedure is compiled with latest logic

    --------------------------------------------------------------------------------------------------
    -- 1. EXECUTION AND RESTART PROTOCOL
    --------------------------------------------------------------------------------------------------
    
    DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
    
    -- Step A: Clear abandoned locks and recycle tasks
    reset_stalled_tasks; 

    -- 🌟 CRITICAL FIX: Get final batch count from the actual MAX job partition ID 🌟
    SELECT MAX(job_partition_id) INTO v_batches_needed
    FROM row_count_control;
    
    -- Safety check for the case where the control table is empty
    IF v_batches_needed IS NULL THEN
        DBMS_OUTPUT.PUT_LINE('ERROR: No tables found in the control queue. Cannot proceed.');
        RETURN;
    END IF;

    -- Step B: Launch/Relaunch Execution
    DBMS_OUTPUT.PUT_LINE('Launching up to ' || v_batches_needed || ' parallel jobs...');

    FOR i IN 1..v_batches_needed LOOP
        v_job_name := c_job_prefix || LPAD(i, 2, '0');
        
        -- Check if this batch has any PENDING or FAILED work before launching (Optimization)
        SELECT COUNT(*) INTO v_pending_in_batch
        FROM row_count_control
        WHERE job_partition_id = i AND status IN ('PENDING', 'FAILED');

        IF v_pending_in_batch > 0 THEN
            -- Cleanup old job definition (Avoids ORA-27475 crash)
            execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;'); 

            -- Create, Set Argument, and Enable NEW JOB
            DBMS_SCHEDULER.CREATE_JOB (
                job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
                number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE 
            );
            
            -- Set argument (using TO_CHAR to avoid PLS-00307 ambiguity)
            DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
            DBMS_SCHEDULER.ENABLE(v_job_name);
            
            v_total_submitted := v_total_submitted + 1;
            DBMS_OUTPUT.PUT_LINE('  -> Submitted Job ' || v_job_name || ' (' || v_pending_in_batch || ' tasks pending/failed)');
        END IF;
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('--- FINAL SUCCESS: ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
        ROLLBACK;
        RAISE;
END;
/

====================================================================

UPDATE row_count_control
SET 
    status = 'PENDING',
    start_time = NULL,
    end_time = NULL,
    error_message = 'RETRY FORCED by DBA. Originally: ' || NVL(error_message, 'Unknown.')
WHERE 
    -- Target tasks that were abandoned (stuck in RUNNING for > 1 hour)
    (status = 'RUNNING' AND start_time < SYSTIMESTAMP - INTERVAL '60' MINUTE)
    -- OR Target tasks that previously failed but are now ready for retry
    OR status = 'FAILED';

COMMIT;

Final Production Row Count System (Single Executable)

/*

This script is structured as an anonymous block that calls internal procedures to manage the entire lifecycle.

Usage Instructions

  1. Replace Schemas: Update the owner IN ('HR', 'SALES', 'FINANCE') list with your actual schema names in the DDL procedure definitions.

  2. Execute the Entire Script: Run the entire code block once.

    • The first time, it performs the full setup.

    • For subsequent runs, it skips the initial setup and goes straight to the Submission/Restart phase.

      */

 



SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE

DECLARE
    -- Configuration variables
    c_batch_size        CONSTANT NUMBER := 40;
    c_job_prefix        CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';

    -- Runtime Variables
    v_total_source_count NUMBER;
    v_batches_needed     NUMBER;
    v_job_name           VARCHAR2(128);
    v_ddl_statement      VARCHAR2(4000);
    v_total_submitted    NUMBER := 0;

    -- Helper procedure to safely attempt DDL destruction (IGNORES 'DOES NOT EXIST')
    PROCEDURE safe_execute_drop(p_sql IN VARCHAR2) IS
    BEGIN
        EXECUTE IMMEDIATE p_sql;
    EXCEPTION
        WHEN OTHERS THEN
            -- ORA-00942 (table/view does not exist)
            -- ORA-04043 (object does not exist)
            IF SQLCODE NOT IN (-942, -4043) THEN 
                RAISE; 
            END IF;
    END safe_execute_drop;

    -- P1. Initializes DDL and Structural Integrity (Indestructible Setup)
    PROCEDURE initialize_system IS
    BEGIN
        DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: FULL SYSTEM INITIALIZATION ---');
        
        -- 1. INDESTRUCTIBLE CLEANUP PHASE: Attempt to drop everything, ignore errors
        safe_execute_drop('DROP VIEW row_count_summary');
        safe_execute_drop('DROP TABLE row_count_control CASCADE CONSTRAINTS');
        safe_execute_drop('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS'); 
        safe_execute_drop('DROP TABLE job_staging_gtt');

        -- 2. Creation Phase (Must succeed now that dependencies are gone)
        EXECUTE IMMEDIATE '
            CREATE TABLE row_count_control (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
                start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
                CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE TABLE row_counts_scheduler_log (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
                batch_id NUMBER, error_message VARCHAR2(4000),
                CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
                owner VARCHAR2(128), table_name VARCHAR2(128)
            ) ON COMMIT PRESERVE ROWS';
            
        -- Create Summary View
        v_ddl_statement := '
            CREATE OR REPLACE VIEW row_count_summary AS
            SELECT 
                COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
                SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
                ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
                MAX(end_time) AS last_completion 
            FROM row_count_control';
        EXECUTE IMMEDIATE v_ddl_statement;
        
        COMMIT;
        DBMS_OUTPUT.PUT_LINE('  -> SYSTEM DDL SETUP COMPLETE.');
    END initialize_system;

    -- P2. Populates the Master Control Table using NTILE
    PROCEDURE prepare_workload IS
        v_total_count NUMBER;
        v_ddl_insert VARCHAR2(4000);
    BEGIN
        SELECT COUNT(*) INTO v_total_count 
        FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';

        v_batches_needed := CEIL(v_total_count / c_batch_size);

        DELETE FROM row_count_control; -- Clear old work data
        
        v_ddl_insert := '
            INSERT INTO row_count_control (schema_name, table_name, job_partition_id)
            SELECT
                owner, table_name,
                NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
            FROM
                all_tables
            WHERE
                owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''';
                
        EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
        COMMIT;
        DBMS_OUTPUT.PUT_LINE('  -> WORKLOAD PREPARED. ' || v_total_count || ' tasks partitioned into ' || v_batches_needed || ' batches.');
    END prepare_workload;
    
    -- P3. Clears abandoned 'RUNNING' tasks
    PROCEDURE reset_stalled_tasks IS
        v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    BEGIN
        -- Resets stalled tasks to PENDING for recycling
        UPDATE row_count_control
        SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
        WHERE status = 'RUNNING'
          AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
        COMMIT;
    END reset_stalled_tasks;
    
    -- P4. Creates the core counting procedure (The Engine)
    PROCEDURE create_counting_procedure IS
    BEGIN
        v_ddl_statement := '
            CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
                v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
                v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000); 
                CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
            BEGIN
                -- Phase 1: STAGE WORKLOAD 
                EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
                v_sql_staging_filter := ''
                    INSERT INTO job_staging_gtt (owner, table_name)
                    SELECT schema_name, table_name
                    FROM row_count_control
                    WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
                    '';
                EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id; -- Includes PENDING and FAILED for retry
                COMMIT; 

                -- Phase 2: PROCESS STAGED WORKLOAD
                FOR r IN c_staged_tables LOOP
                    BEGIN 
                        -- Update status to RUNNING immediately 
                        UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                        v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
                        EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
                        
                        -- LOG SUCCESS and Mark COMPLETE
                        DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
                        INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                        VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);

                        UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                    EXCEPTION 
                        WHEN OTHERS THEN 
                            v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                            -- Mark task as FAILED (prevents automatic retries until DBA intervenes)
                            UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
                            WHERE schema_name = r.owner AND table_name = r.table_name;
                            INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                            VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                            COMMIT; 
                    END;
                END LOOP;
            END;
            ';
        EXECUTE IMMEDIATE v_ddl_statement;
    END create_counting_procedure;


BEGIN
    --------------------------------------------------------------------------------------------------
    -- 0. INITIAL LAUNCH OR RESUME CHECK
    --------------------------------------------------------------------------------------------------
    IF NOT object_exists('ROW_COUNT_CONTROL') THEN
        initialize_system; 
        prepare_workload; -- Populates the queue for the very first time
    END IF;
    
    create_counting_procedure; -- Ensure procedure is compiled with latest logic

    --------------------------------------------------------------------------------------------------
    -- 1. EXECUTION AND RESTART PROTOCOL
    --------------------------------------------------------------------------------------------------
    
    DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
    
    -- Step A: Clear abandoned locks and recycle tasks
    reset_stalled_tasks; 

    -- Get final batch count (always based on the total control table size)
    SELECT CEIL(COUNT(*) / c_batch_size) INTO v_batches_needed
    FROM row_count_control;
    
    -- Step B: Launch/Relaunch Execution
    DBMS_OUTPUT.PUT_LINE('Launching ' || v_batches_needed || ' parallel jobs...');

    FOR i IN 1..v_batches_needed LOOP
        v_job_name := c_job_prefix || LPAD(i, 2, '0');
        
        -- Cleanup old job definition (Always drop before recreating for safety)
        execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;'); 

        -- Create, Set Argument, and Enable NEW JOB
        DBMS_SCHEDULER.CREATE_JOB (
            job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
            number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE 
        );
        
        DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
        DBMS_SCHEDULER.ENABLE(v_job_name);
        
        v_total_submitted := v_total_submitted + 1;
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('--- ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
        ROLLBACK;
        RAISE;
END;
/

A. Query to Identify Targets

First, use a query to view all current failures:

SQL
SELECT
    schema_name,
    table_name,
    error_message
FROM
    row_count_control
WHERE
    status = 'FAILED'
ORDER BY
    error_message;

B. The Update Command (Retry Action)

You have two options for the update: retry all failed tasks or retry a specific task.

Option 1: Retry ALL Failed Tasks (Recommended after a global fix)

SQL
UPDATE row_count_control
SET 
    status = 'PENDING',
    error_message = 'RETRY PENDING - Root cause fixed.'
WHERE 
    status = 'FAILED';

COMMIT;
DBMS_OUTPUT.PUT_LINE(SQL%ROWCOUNT || ' FAILED tasks have been reset to PENDING.');

Option 2: Retry a Specific Table (Recommended for testing the fix)

SQL
UPDATE row_count_control
SET 
    status = 'PENDING',
    error_message = 'RETRY PENDING - Permission issue resolved.'
WHERE 
    status = 'FAILED'
    AND schema_name = 'FINANCE'
    AND table_name = 'LEDGER_BALANCE_BIG';

COMMIT;
DBMS_OUTPUT.PUT_LINE('Specific table set to PENDING.');
====================

This step clears any tasks abandoned by jobs that crashed but left their status as 'RUNNING'.

ActionCommand/QueryPurpose
Recycle Stalled TasksEXEC reset_stalled_tasks;Forces any job stuck in the RUNNING state for over 60 minutes back to the PENDING queue for recycling.

RESET_STALLED_TASKS Procedure


CREATE OR REPLACE PROCEDURE reset_stalled_tasks AS
    -- Defines the threshold for marking a task as "stalled" (e.g., inactive for 60 minutes).
    v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    v_rows_reset NUMBER;
BEGIN
    DBMS_OUTPUT.PUT_LINE('--- Initiating Stalled Task Cleanup ---');
    
    UPDATE row_count_control
    SET status = 'PENDING', -- Recycles the abandoned task back into the work queue
        start_time = NULL,
        end_time = NULL,
        error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
    WHERE status = 'RUNNING'
      -- Identify tasks where the start_time is older than the threshold
      AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
      
    v_rows_reset := SQL%ROWCOUNT;
    
    IF v_rows_reset > 0 THEN
        DBMS_OUTPUT.PUT_LINE('!!! WARNING: ' || v_rows_reset || ' stalled tasks were reset to PENDING and are ready for retry.');
    ELSE
        DBMS_OUTPUT.PUT_LINE('No stalled tasks found.');
    END IF;
    
    COMMIT;
    
EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('FATAL ERROR during stalled task reset: ' || SQLERRM);
        -- Rollback should only affect this small transaction, but we include it for safety
        ROLLBACK; 
        RAISE;
END;
/

==========

CREATE OR REPLACE VIEW row_count_summary AS
SELECT
    -- Total tasks submitted to the work queue
    COUNT(*) AS total_tasks,
    
    -- Status Counts
    SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS successful_count,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
    SUM(CASE WHEN status = 'RUNNING' THEN 1 ELSE 0 END) AS running_count,
    SUM(CASE WHEN status = 'PENDING' THEN 1 ELSE 0 END) AS pending_count,
    
    -- Progress Metrics
    -- Calculate percentage complete based on successful tasks vs. total tasks
    ROUND(
        SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / GREATEST(COUNT(*), 1) * 100, 
        1
    ) AS pct_complete,
    
    -- Timestamp of the last successful activity
    MAX(end_time) AS last_completion_time 
FROM
    row_count_control;


SELECT
    'TOTAL WORKLOAD STATUS' AS metric_group,
    total_tasks,
    successful_count,
    failed_count,
    running_count,
    pending_count,
    pct_complete,
    TO_CHAR(last_completion_time, 'YYYY-MM-DD HH24:MI:SS') AS last_update
FROM
    row_count_summary;


SELECT
    schema_name,
    COUNT(*) AS total_tables,
    SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS done_count,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
    SUM(CASE WHEN status = 'PENDING' THEN 1 ELSE 0 END) AS pending_count,
    ROUND(SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / COUNT(*) * 100, 1) AS pct_done
FROM
    row_count_control
GROUP BY
    schema_name
ORDER BY
    pct_done, schema_name;



SELECT
    c.schema_name,
    c.table_name,
    c.status AS control_status,
    c.job_partition_id AS current_batch,
    TO_CHAR(l.row_count, '999,999,999,999') AS final_row_count,
    TO_CHAR(l.count_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS count_time
FROM
    row_count_control c
LEFT JOIN
    row_counts_scheduler_log l ON c.schema_name = l.schema_name AND c.table_name = l.table_name
WHERE
    -- Example: Check a specific table, or leave commented for full list
    c.table_name = 'YOUR_SPECIFIC_TABLE_NAME' 
    -- OR
    -- c.status IN ('RUNNING', 'PENDING')
ORDER BY
    c.status DESC, c.schema_name, c.table_name;


To query the row count for a single table using the data you've collected, you must query the row_counts_scheduler_logtable directly.

Here is the most efficient and accurate query:

SQL
SELECT
    schema_name,
    table_name,
    TO_CHAR(row_count, '999,999,999,999') AS final_row_count,
    TO_CHAR(count_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS count_time
FROM
    row_counts_scheduler_log
WHERE
    schema_name = 'YOUR_SCHEMA_NAME'  -- 👈 Replace with the target schema (e.g., 'HR')
    AND table_name = 'YOUR_TABLE_NAME' -- 👈 Replace with the target table name
    AND row_count >= 0 -- Ensure you only get the successful count
ORDER BY
    count_timestamp DESC
FETCH NEXT 1 ROW ONLY;