Sunday, October 19, 2025

using ole

SET SERVEROUTPUT ON SIZE UNLIMITED

DECLARE
    -- Configuration Constants
    c_batch_size        CONSTANT NUMBER := 50;  -- Final safe batch size
    c_job_prefix        CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';

    -- Runtime Variables
    v_total_source_count NUMBER;
    v_batches_needed     NUMBER;
    v_job_name           VARCHAR2(128);
    v_ddl_statement      VARCHAR2(4000);
    
    -- Helper procedure to execute DDL safely
    PROCEDURE execute_ddl(p_sql IN VARCHAR2) IS
    BEGIN
        EXECUTE IMMEDIATE p_sql;
    EXCEPTION
        WHEN OTHERS THEN
            -- Ignore "object does not exist" errors during cleanup
            IF SQLCODE NOT IN (-942, -2443, -1418, -2289, -1432, -27475) THEN
                RAISE;
            END IF;
    END;

BEGIN
    DBMS_OUTPUT.PUT_LINE('===================================================================');
    DBMS_OUTPUT.PUT_LINE('PHASE 1: SYSTEM SETUP AND WORK QUEUE INITIALIZATION');
    DBMS_OUTPUT.PUT_LINE('===================================================================');

    --------------------------------------------------------------------------------------
    -- A. CLEANUP AND DDL SETUP
    --------------------------------------------------------------------------------------
    execute_ddl('DROP VIEW row_count_summary');
    execute_ddl('DROP TABLE row_count_control CASCADE CONSTRAINTS');
    execute_ddl('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS'); 
    execute_ddl('DROP TABLE job_staging_gtt');

    -- Create Master Control Table (Queue)
    EXECUTE IMMEDIATE '
        CREATE TABLE row_count_control (
            schema_name VARCHAR2(128) NOT NULL,
            table_name VARCHAR2(128) NOT NULL,
            job_partition_id NUMBER NOT NULL, 
            status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL, 
            start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
            CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
        )';
    -- Create Permanent Log Table (Result History)
    EXECUTE IMMEDIATE '
        CREATE TABLE row_counts_scheduler_log (
            schema_name VARCHAR2(128) NOT NULL,
            table_name VARCHAR2(128) NOT NULL,
            row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
            batch_id NUMBER, error_message VARCHAR2(4000),
            CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
        )';
    -- Create Global Temporary Table (Private Session Staging)
    EXECUTE IMMEDIATE '
        CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
            owner VARCHAR2(128), table_name VARCHAR2(128)
        ) ON COMMIT PRESERVE ROWS';
    DBMS_OUTPUT.PUT_LINE('Base tables and GTT created.');

    --------------------------------------------------------------------------------------
    -- B. WORKLOAD POPULATION (NTILE - The Load Balancing Fix)
    --------------------------------------------------------------------------------------
    -- 1. Calculate total tables and batches needed
    SELECT COUNT(*) INTO v_total_source_count 
    FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';
    
    v_batches_needed := CEIL(v_total_source_count / c_batch_size);

    -- 2. Insert all tables using NTILE for partitioning
    v_ddl_statement := '
        INSERT INTO row_count_control (schema_name, table_name, job_partition_id)
        SELECT
            owner,
            table_name,
            -- NTILE divides the tables into V_BATCHES_NEEDED groups (1 to N)
            NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
        FROM
            all_tables
        WHERE
            owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''';
            
    EXECUTE IMMEDIATE v_ddl_statement USING v_batches_needed;
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('Workload partitioned into ' || v_batches_needed || ' batches using NTILE.');

    --------------------------------------------------------------------------------------
    -- C. CREATE THE CORE PROCEDURE (Job Engine)
    --------------------------------------------------------------------------------------
    v_ddl_statement := '
        CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
            v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
            v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000); 
            CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
        BEGIN
            -- 1. STAGE WORKLOAD: Identify and insert PENDING work into GTT
            EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
            v_sql_staging_filter := ''
                INSERT INTO job_staging_gtt (owner, table_name)
                SELECT schema_name, table_name
                FROM row_count_control
                WHERE job_partition_id = :p_batch_id AND status = ''''PENDING''''
                '';
            EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id;
            COMMIT;

            -- 2. PROCESS STAGED WORKLOAD
            FOR r IN c_staged_tables LOOP
                BEGIN 
                    -- Update status to RUNNING immediately (for monitoring and stall detection)
                    UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
                    WHERE schema_name = r.owner AND table_name = r.table_name;
                    COMMIT;

                    -- COUNT (The heavy sequential operation)
                    v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
                    EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
                    
                    -- LOG SUCCESS and Mark COMPLETE
                    DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
                    INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                    VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);

                    UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
                    WHERE schema_name = r.owner AND table_name = r.table_name;
                    COMMIT;

                EXCEPTION 
                    WHEN OTHERS THEN 
                        v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                        UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                        VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                        COMMIT; 
                END;
            END LOOP;
        END;
        ';
    EXECUTE IMMEDIATE v_ddl_statement;
    DBMS_OUTPUT.PUT_LINE('Procedure count_table_batch created.');

    --------------------------------------------------------------------------------------
    -- D. JOB SUBMISSION AND EXECUTION
    --------------------------------------------------------------------------------------
    DBMS_OUTPUT.PUT_LINE('===================================================================');
    DBMS_OUTPUT.PUT_LINE('PHASE 2: JOB SUBMISSION (Launching ' || v_batches_needed || ' Parallel Jobs)');
    DBMS_OUTPUT.PUT_LINE('===================================================================');

    -- Reset stalled tasks before submission to recycle any abandoned jobs
    execute_ddl('BEGIN reset_stalled_tasks; END;');

    FOR i IN 1..v_batches_needed LOOP
        v_job_name := c_job_prefix || LPAD(i, 2, '0');
        
        -- Cleanup old job definitions
        execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;'); 

        -- Create, Set Argument, and Enable (The robust sequence)
        DBMS_SCHEDULER.CREATE_JOB (
            job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
            number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE 
        );
        
        -- Use TO_CHAR(i) to avoid PLS-00307 ambiguity
        DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
        DBMS_SCHEDULER.ENABLE(v_job_name);
        
        -- DBMS_OUTPUT.PUT_LINE('  -> Submitted Job: ' || v_job_name);
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('All ' || v_batches_needed || ' jobs submitted and running in parallel!');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR DURING EXECUTION !!!');
        DBMS_OUTPUT.PUT_LINE('SQLERRM: ' || SQLERRM);
        ROLLBACK;
        RAISE;
END;
/

No comments: