SET SERVEROUTPUT ON SIZE UNLIMITEDDECLARE-- Configuration Constantsc_batch_size CONSTANT NUMBER := 50; -- Final safe batch sizec_job_prefix CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';-- Runtime Variablesv_total_source_count NUMBER;v_batches_needed NUMBER;v_job_name VARCHAR2(128);v_ddl_statement VARCHAR2(4000);-- Helper procedure to execute DDL safelyPROCEDURE execute_ddl(p_sql IN VARCHAR2) ISBEGINEXECUTE IMMEDIATE p_sql;EXCEPTIONWHEN OTHERS THEN-- Ignore "object does not exist" errors during cleanupIF SQLCODE NOT IN (-942, -2443, -1418, -2289, -1432, -27475) THENRAISE;END IF;END;BEGINDBMS_OUTPUT.PUT_LINE('===================================================================');DBMS_OUTPUT.PUT_LINE('PHASE 1: SYSTEM SETUP AND WORK QUEUE INITIALIZATION');DBMS_OUTPUT.PUT_LINE('===================================================================');---------------------------------------------------------------------------------------- A. CLEANUP AND DDL SETUP--------------------------------------------------------------------------------------execute_ddl('DROP VIEW row_count_summary');execute_ddl('DROP TABLE row_count_control CASCADE CONSTRAINTS');execute_ddl('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS');execute_ddl('DROP TABLE job_staging_gtt');-- Create Master Control Table (Queue)EXECUTE IMMEDIATE 'CREATE TABLE row_count_control (schema_name VARCHAR2(128) NOT NULL,table_name VARCHAR2(128) NOT NULL,job_partition_id NUMBER NOT NULL,status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name))';-- Create Permanent Log Table (Result History)EXECUTE IMMEDIATE 'CREATE TABLE row_counts_scheduler_log (schema_name VARCHAR2(128) NOT NULL,table_name VARCHAR2(128) NOT NULL,row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),batch_id NUMBER, error_message VARCHAR2(4000),CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name))';-- Create Global Temporary Table (Private Session Staging)EXECUTE IMMEDIATE 'CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (owner VARCHAR2(128), table_name VARCHAR2(128)) ON COMMIT PRESERVE ROWS';DBMS_OUTPUT.PUT_LINE('Base tables and GTT created.');---------------------------------------------------------------------------------------- B. WORKLOAD POPULATION (NTILE - The Load Balancing Fix)---------------------------------------------------------------------------------------- 1. Calculate total tables and batches neededSELECT COUNT(*) INTO v_total_source_countFROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';v_batches_needed := CEIL(v_total_source_count / c_batch_size);-- 2. Insert all tables using NTILE for partitioningv_ddl_statement := 'INSERT INTO row_count_control (schema_name, table_name, job_partition_id)SELECTowner,table_name,-- NTILE divides the tables into V_BATCHES_NEEDED groups (1 to N)NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_idFROMall_tablesWHEREowner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''';EXECUTE IMMEDIATE v_ddl_statement USING v_batches_needed;COMMIT;DBMS_OUTPUT.PUT_LINE('Workload partitioned into ' || v_batches_needed || ' batches using NTILE.');---------------------------------------------------------------------------------------- C. CREATE THE CORE PROCEDURE (Job Engine)--------------------------------------------------------------------------------------v_ddl_statement := 'CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) ASv_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000);CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;BEGIN-- 1. STAGE WORKLOAD: Identify and insert PENDING work into GTTEXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';v_sql_staging_filter := ''INSERT INTO job_staging_gtt (owner, table_name)SELECT schema_name, table_nameFROM row_count_controlWHERE job_partition_id = :p_batch_id AND status = ''''PENDING'''''';EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id;COMMIT;-- 2. PROCESS STAGED WORKLOADFOR r IN c_staged_tables LOOPBEGIN-- Update status to RUNNING immediately (for monitoring and stall detection)UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMPWHERE schema_name = r.owner AND table_name = r.table_name;COMMIT;-- COUNT (The heavy sequential operation)v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';EXECUTE IMMEDIATE v_sql_count INTO v_cnt;-- LOG SUCCESS and Mark COMPLETEDELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULLWHERE schema_name = r.owner AND table_name = r.table_name;COMMIT;EXCEPTIONWHEN OTHERS THENv_error_msg := SUBSTR(SQLERRM, 1, 4000);UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msgWHERE schema_name = r.owner AND table_name = r.table_name;INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);COMMIT;END;END LOOP;END;';EXECUTE IMMEDIATE v_ddl_statement;DBMS_OUTPUT.PUT_LINE('Procedure count_table_batch created.');---------------------------------------------------------------------------------------- D. JOB SUBMISSION AND EXECUTION--------------------------------------------------------------------------------------DBMS_OUTPUT.PUT_LINE('===================================================================');DBMS_OUTPUT.PUT_LINE('PHASE 2: JOB SUBMISSION (Launching ' || v_batches_needed || ' Parallel Jobs)');DBMS_OUTPUT.PUT_LINE('===================================================================');-- Reset stalled tasks before submission to recycle any abandoned jobsexecute_ddl('BEGIN reset_stalled_tasks; END;');FOR i IN 1..v_batches_needed LOOPv_job_name := c_job_prefix || LPAD(i, 2, '0');-- Cleanup old job definitionsexecute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;');-- Create, Set Argument, and Enable (The robust sequence)DBMS_SCHEDULER.CREATE_JOB (job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE);-- Use TO_CHAR(i) to avoid PLS-00307 ambiguityDBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));DBMS_SCHEDULER.ENABLE(v_job_name);-- DBMS_OUTPUT.PUT_LINE(' -> Submitted Job: ' || v_job_name);END LOOP;COMMIT;DBMS_OUTPUT.PUT_LINE('All ' || v_batches_needed || ' jobs submitted and running in parallel!');EXCEPTIONWHEN OTHERS THENDBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR DURING EXECUTION !!!');DBMS_OUTPUT.PUT_LINE('SQLERRM: ' || SQLERRM);ROLLBACK;RAISE;END;/
Sunday, October 19, 2025
using ole
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment