Sunday, October 19, 2025

Final Production Row Count System (Single Executable)

/*

This script is structured as an anonymous block that calls internal procedures to manage the entire lifecycle.

Usage Instructions

  1. Replace Schemas: Update the owner IN ('HR', 'SALES', 'FINANCE') list with your actual schema names in the DDL procedure definitions.

  2. Execute the Entire Script: Run the entire code block once.

    • The first time, it performs the full setup.

    • For subsequent runs, it skips the initial setup and goes straight to the Submission/Restart phase.

      */

 



SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE

DECLARE
    -- Configuration variables
    c_batch_size        CONSTANT NUMBER := 40;
    c_job_prefix        CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';

    -- Runtime Variables
    v_total_source_count NUMBER;
    v_batches_needed     NUMBER;
    v_job_name           VARCHAR2(128);
    v_ddl_statement      VARCHAR2(4000);
    v_total_submitted    NUMBER := 0;

    -- Helper procedure to safely attempt DDL destruction (IGNORES 'DOES NOT EXIST')
    PROCEDURE safe_execute_drop(p_sql IN VARCHAR2) IS
    BEGIN
        EXECUTE IMMEDIATE p_sql;
    EXCEPTION
        WHEN OTHERS THEN
            -- ORA-00942 (table/view does not exist)
            -- ORA-04043 (object does not exist)
            IF SQLCODE NOT IN (-942, -4043) THEN 
                RAISE; 
            END IF;
    END safe_execute_drop;

    -- P1. Initializes DDL and Structural Integrity (Indestructible Setup)
    PROCEDURE initialize_system IS
    BEGIN
        DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: FULL SYSTEM INITIALIZATION ---');
        
        -- 1. INDESTRUCTIBLE CLEANUP PHASE: Attempt to drop everything, ignore errors
        safe_execute_drop('DROP VIEW row_count_summary');
        safe_execute_drop('DROP TABLE row_count_control CASCADE CONSTRAINTS');
        safe_execute_drop('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS'); 
        safe_execute_drop('DROP TABLE job_staging_gtt');

        -- 2. Creation Phase (Must succeed now that dependencies are gone)
        EXECUTE IMMEDIATE '
            CREATE TABLE row_count_control (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
                start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
                CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE TABLE row_counts_scheduler_log (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
                batch_id NUMBER, error_message VARCHAR2(4000),
                CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
                owner VARCHAR2(128), table_name VARCHAR2(128)
            ) ON COMMIT PRESERVE ROWS';
            
        -- Create Summary View
        v_ddl_statement := '
            CREATE OR REPLACE VIEW row_count_summary AS
            SELECT 
                COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
                SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
                ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
                MAX(end_time) AS last_completion 
            FROM row_count_control';
        EXECUTE IMMEDIATE v_ddl_statement;
        
        COMMIT;
        DBMS_OUTPUT.PUT_LINE('  -> SYSTEM DDL SETUP COMPLETE.');
    END initialize_system;

    -- P2. Populates the Master Control Table using NTILE
    PROCEDURE prepare_workload IS
        v_total_count NUMBER;
        v_ddl_insert VARCHAR2(4000);
    BEGIN
        SELECT COUNT(*) INTO v_total_count 
        FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';

        v_batches_needed := CEIL(v_total_count / c_batch_size);

        DELETE FROM row_count_control; -- Clear old work data
        
        v_ddl_insert := '
            INSERT INTO row_count_control (schema_name, table_name, job_partition_id)
            SELECT
                owner, table_name,
                NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
            FROM
                all_tables
            WHERE
                owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''';
                
        EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
        COMMIT;
        DBMS_OUTPUT.PUT_LINE('  -> WORKLOAD PREPARED. ' || v_total_count || ' tasks partitioned into ' || v_batches_needed || ' batches.');
    END prepare_workload;
    
    -- P3. Clears abandoned 'RUNNING' tasks
    PROCEDURE reset_stalled_tasks IS
        v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    BEGIN
        -- Resets stalled tasks to PENDING for recycling
        UPDATE row_count_control
        SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
        WHERE status = 'RUNNING'
          AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
        COMMIT;
    END reset_stalled_tasks;
    
    -- P4. Creates the core counting procedure (The Engine)
    PROCEDURE create_counting_procedure IS
    BEGIN
        v_ddl_statement := '
            CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
                v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
                v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000); 
                CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
            BEGIN
                -- Phase 1: STAGE WORKLOAD 
                EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
                v_sql_staging_filter := ''
                    INSERT INTO job_staging_gtt (owner, table_name)
                    SELECT schema_name, table_name
                    FROM row_count_control
                    WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
                    '';
                EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id; -- Includes PENDING and FAILED for retry
                COMMIT; 

                -- Phase 2: PROCESS STAGED WORKLOAD
                FOR r IN c_staged_tables LOOP
                    BEGIN 
                        -- Update status to RUNNING immediately 
                        UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                        v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
                        EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
                        
                        -- LOG SUCCESS and Mark COMPLETE
                        DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
                        INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                        VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);

                        UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                    EXCEPTION 
                        WHEN OTHERS THEN 
                            v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                            -- Mark task as FAILED (prevents automatic retries until DBA intervenes)
                            UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
                            WHERE schema_name = r.owner AND table_name = r.table_name;
                            INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                            VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                            COMMIT; 
                    END;
                END LOOP;
            END;
            ';
        EXECUTE IMMEDIATE v_ddl_statement;
    END create_counting_procedure;


BEGIN
    --------------------------------------------------------------------------------------------------
    -- 0. INITIAL LAUNCH OR RESUME CHECK
    --------------------------------------------------------------------------------------------------
    IF NOT object_exists('ROW_COUNT_CONTROL') THEN
        initialize_system; 
        prepare_workload; -- Populates the queue for the very first time
    END IF;
    
    create_counting_procedure; -- Ensure procedure is compiled with latest logic

    --------------------------------------------------------------------------------------------------
    -- 1. EXECUTION AND RESTART PROTOCOL
    --------------------------------------------------------------------------------------------------
    
    DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
    
    -- Step A: Clear abandoned locks and recycle tasks
    reset_stalled_tasks; 

    -- Get final batch count (always based on the total control table size)
    SELECT CEIL(COUNT(*) / c_batch_size) INTO v_batches_needed
    FROM row_count_control;
    
    -- Step B: Launch/Relaunch Execution
    DBMS_OUTPUT.PUT_LINE('Launching ' || v_batches_needed || ' parallel jobs...');

    FOR i IN 1..v_batches_needed LOOP
        v_job_name := c_job_prefix || LPAD(i, 2, '0');
        
        -- Cleanup old job definition (Always drop before recreating for safety)
        execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;'); 

        -- Create, Set Argument, and Enable NEW JOB
        DBMS_SCHEDULER.CREATE_JOB (
            job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
            number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE 
        );
        
        DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
        DBMS_SCHEDULER.ENABLE(v_job_name);
        
        v_total_submitted := v_total_submitted + 1;
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('--- ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
        ROLLBACK;
        RAISE;
END;
/

A. Query to Identify Targets

First, use a query to view all current failures:

SQL
SELECT
    schema_name,
    table_name,
    error_message
FROM
    row_count_control
WHERE
    status = 'FAILED'
ORDER BY
    error_message;

B. The Update Command (Retry Action)

You have two options for the update: retry all failed tasks or retry a specific task.

Option 1: Retry ALL Failed Tasks (Recommended after a global fix)

SQL
UPDATE row_count_control
SET 
    status = 'PENDING',
    error_message = 'RETRY PENDING - Root cause fixed.'
WHERE 
    status = 'FAILED';

COMMIT;
DBMS_OUTPUT.PUT_LINE(SQL%ROWCOUNT || ' FAILED tasks have been reset to PENDING.');

Option 2: Retry a Specific Table (Recommended for testing the fix)

SQL
UPDATE row_count_control
SET 
    status = 'PENDING',
    error_message = 'RETRY PENDING - Permission issue resolved.'
WHERE 
    status = 'FAILED'
    AND schema_name = 'FINANCE'
    AND table_name = 'LEDGER_BALANCE_BIG';

COMMIT;
DBMS_OUTPUT.PUT_LINE('Specific table set to PENDING.');
====================

This step clears any tasks abandoned by jobs that crashed but left their status as 'RUNNING'.

ActionCommand/QueryPurpose
Recycle Stalled TasksEXEC reset_stalled_tasks;Forces any job stuck in the RUNNING state for over 60 minutes back to the PENDING queue for recycling.

RESET_STALLED_TASKS Procedure


CREATE OR REPLACE PROCEDURE reset_stalled_tasks AS
    -- Defines the threshold for marking a task as "stalled" (e.g., inactive for 60 minutes).
    v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    v_rows_reset NUMBER;
BEGIN
    DBMS_OUTPUT.PUT_LINE('--- Initiating Stalled Task Cleanup ---');
    
    UPDATE row_count_control
    SET status = 'PENDING', -- Recycles the abandoned task back into the work queue
        start_time = NULL,
        end_time = NULL,
        error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
    WHERE status = 'RUNNING'
      -- Identify tasks where the start_time is older than the threshold
      AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
      
    v_rows_reset := SQL%ROWCOUNT;
    
    IF v_rows_reset > 0 THEN
        DBMS_OUTPUT.PUT_LINE('!!! WARNING: ' || v_rows_reset || ' stalled tasks were reset to PENDING and are ready for retry.');
    ELSE
        DBMS_OUTPUT.PUT_LINE('No stalled tasks found.');
    END IF;
    
    COMMIT;
    
EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('FATAL ERROR during stalled task reset: ' || SQLERRM);
        -- Rollback should only affect this small transaction, but we include it for safety
        ROLLBACK; 
        RAISE;
END;
/

==========

CREATE OR REPLACE VIEW row_count_summary AS
SELECT
    -- Total tasks submitted to the work queue
    COUNT(*) AS total_tasks,
    
    -- Status Counts
    SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS successful_count,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
    SUM(CASE WHEN status = 'RUNNING' THEN 1 ELSE 0 END) AS running_count,
    SUM(CASE WHEN status = 'PENDING' THEN 1 ELSE 0 END) AS pending_count,
    
    -- Progress Metrics
    -- Calculate percentage complete based on successful tasks vs. total tasks
    ROUND(
        SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / GREATEST(COUNT(*), 1) * 100, 
        1
    ) AS pct_complete,
    
    -- Timestamp of the last successful activity
    MAX(end_time) AS last_completion_time 
FROM
    row_count_control;


SELECT
    'TOTAL WORKLOAD STATUS' AS metric_group,
    total_tasks,
    successful_count,
    failed_count,
    running_count,
    pending_count,
    pct_complete,
    TO_CHAR(last_completion_time, 'YYYY-MM-DD HH24:MI:SS') AS last_update
FROM
    row_count_summary;


SELECT
    schema_name,
    COUNT(*) AS total_tables,
    SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) AS done_count,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
    SUM(CASE WHEN status = 'PENDING' THEN 1 ELSE 0 END) AS pending_count,
    ROUND(SUM(CASE WHEN status = 'COMPLETE' THEN 1 ELSE 0 END) / COUNT(*) * 100, 1) AS pct_done
FROM
    row_count_control
GROUP BY
    schema_name
ORDER BY
    pct_done, schema_name;



SELECT
    c.schema_name,
    c.table_name,
    c.status AS control_status,
    c.job_partition_id AS current_batch,
    TO_CHAR(l.row_count, '999,999,999,999') AS final_row_count,
    TO_CHAR(l.count_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS count_time
FROM
    row_count_control c
LEFT JOIN
    row_counts_scheduler_log l ON c.schema_name = l.schema_name AND c.table_name = l.table_name
WHERE
    -- Example: Check a specific table, or leave commented for full list
    c.table_name = 'YOUR_SPECIFIC_TABLE_NAME' 
    -- OR
    -- c.status IN ('RUNNING', 'PENDING')
ORDER BY
    c.status DESC, c.schema_name, c.table_name;


To query the row count for a single table using the data you've collected, you must query the row_counts_scheduler_logtable directly.

Here is the most efficient and accurate query:

SQL
SELECT
    schema_name,
    table_name,
    TO_CHAR(row_count, '999,999,999,999') AS final_row_count,
    TO_CHAR(count_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS count_time
FROM
    row_counts_scheduler_log
WHERE
    schema_name = 'YOUR_SCHEMA_NAME'  -- 👈 Replace with the target schema (e.g., 'HR')
    AND table_name = 'YOUR_TABLE_NAME' -- 👈 Replace with the target table name
    AND row_count >= 0 -- Ensure you only get the successful count
ORDER BY
    count_timestamp DESC
FETCH NEXT 1 ROW ONLY;

No comments: