SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE
DECLARE
-- Configuration variables
c_batch_size CONSTANT NUMBER := 40;
c_job_prefix CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';
-- Runtime Variables
v_total_source_count NUMBER;
v_batches_needed NUMBER;
v_job_name VARCHAR2(128);
v_ddl_statement VARCHAR2(4000);
v_total_submitted NUMBER := 0;
-- Helper procedure to safely attempt DDL destruction (IGNORES 'DOES NOT EXIST')
PROCEDURE safe_execute_drop(p_sql IN VARCHAR2) IS
BEGIN
EXECUTE IMMEDIATE p_sql;
EXCEPTION
WHEN OTHERS THEN
-- ORA-00942 (table/view does not exist)
-- ORA-04043 (object does not exist)
IF SQLCODE NOT IN (-942, -4043) THEN
RAISE;
END IF;
END safe_execute_drop;
-- P1. Initializes DDL and Structural Integrity (Indestructible Setup)
PROCEDURE initialize_system IS
BEGIN
DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: FULL SYSTEM INITIALIZATION ---');
-- 1. INDESTRUCTIBLE CLEANUP PHASE: Attempt to drop everything, ignore errors
safe_execute_drop('DROP VIEW row_count_summary');
safe_execute_drop('DROP TABLE row_count_control CASCADE CONSTRAINTS');
safe_execute_drop('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS');
safe_execute_drop('DROP TABLE job_staging_gtt');
-- 2. Creation Phase (Must succeed now that dependencies are gone)
EXECUTE IMMEDIATE '
CREATE TABLE row_count_control (
schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
)';
EXECUTE IMMEDIATE '
CREATE TABLE row_counts_scheduler_log (
schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
batch_id NUMBER, error_message VARCHAR2(4000),
CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
)';
EXECUTE IMMEDIATE '
CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
owner VARCHAR2(128), table_name VARCHAR2(128)
) ON COMMIT PRESERVE ROWS';
-- Create Summary View
v_ddl_statement := '
CREATE OR REPLACE VIEW row_count_summary AS
SELECT
COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
MAX(end_time) AS last_completion
FROM row_count_control';
EXECUTE IMMEDIATE v_ddl_statement;
COMMIT;
DBMS_OUTPUT.PUT_LINE(' -> SYSTEM DDL SETUP COMPLETE.');
END initialize_system;
-- P2. Populates the Master Control Table using NTILE
PROCEDURE prepare_workload IS
v_total_count NUMBER;
v_ddl_insert VARCHAR2(4000);
BEGIN
SELECT COUNT(*) INTO v_total_count
FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';
v_batches_needed := CEIL(v_total_count / c_batch_size);
DELETE FROM row_count_control; -- Clear old work data
v_ddl_insert := '
INSERT INTO row_count_control (schema_name, table_name, job_partition_id)
SELECT
owner, table_name,
NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
FROM
all_tables
WHERE
owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''';
EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
COMMIT;
DBMS_OUTPUT.PUT_LINE(' -> WORKLOAD PREPARED. ' || v_total_count || ' tasks partitioned into ' || v_batches_needed || ' batches.');
END prepare_workload;
-- P3. Clears abandoned 'RUNNING' tasks
PROCEDURE reset_stalled_tasks IS
v_stale_threshold_minutes CONSTANT NUMBER := 60;
BEGIN
-- Resets stalled tasks to PENDING for recycling
UPDATE row_count_control
SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
WHERE status = 'RUNNING'
AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
COMMIT;
END reset_stalled_tasks;
-- P4. Creates the core counting procedure (The Engine)
PROCEDURE create_counting_procedure IS
BEGIN
v_ddl_statement := '
CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000);
CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
BEGIN
-- Phase 1: STAGE WORKLOAD
EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
v_sql_staging_filter := ''
INSERT INTO job_staging_gtt (owner, table_name)
SELECT schema_name, table_name
FROM row_count_control
WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
'';
EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id; -- Includes PENDING and FAILED for retry
COMMIT;
-- Phase 2: PROCESS STAGED WORKLOAD
FOR r IN c_staged_tables LOOP
BEGIN
-- Update status to RUNNING immediately
UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
WHERE schema_name = r.owner AND table_name = r.table_name;
COMMIT;
v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
-- LOG SUCCESS and Mark COMPLETE
DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);
UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
WHERE schema_name = r.owner AND table_name = r.table_name;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
v_error_msg := SUBSTR(SQLERRM, 1, 4000);
-- Mark task as FAILED (prevents automatic retries until DBA intervenes)
UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
WHERE schema_name = r.owner AND table_name = r.table_name;
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
COMMIT;
END;
END LOOP;
END;
';
EXECUTE IMMEDIATE v_ddl_statement;
END create_counting_procedure;
BEGIN
--------------------------------------------------------------------------------------------------
-- 0. INITIAL LAUNCH OR RESUME CHECK
--------------------------------------------------------------------------------------------------
IF NOT object_exists('ROW_COUNT_CONTROL') THEN
initialize_system;
prepare_workload; -- Populates the queue for the very first time
END IF;
create_counting_procedure; -- Ensure procedure is compiled with latest logic
--------------------------------------------------------------------------------------------------
-- 1. EXECUTION AND RESTART PROTOCOL
--------------------------------------------------------------------------------------------------
DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
-- Step A: Clear abandoned locks and recycle tasks
reset_stalled_tasks;
-- Get final batch count (always based on the total control table size)
SELECT CEIL(COUNT(*) / c_batch_size) INTO v_batches_needed
FROM row_count_control;
-- Step B: Launch/Relaunch Execution
DBMS_OUTPUT.PUT_LINE('Launching ' || v_batches_needed || ' parallel jobs...');
FOR i IN 1..v_batches_needed LOOP
v_job_name := c_job_prefix || LPAD(i, 2, '0');
-- Cleanup old job definition (Always drop before recreating for safety)
execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;');
-- Create, Set Argument, and Enable NEW JOB
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE
);
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
DBMS_SCHEDULER.ENABLE(v_job_name);
v_total_submitted := v_total_submitted + 1;
END LOOP;
COMMIT;
DBMS_OUTPUT.PUT_LINE('--- ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
ROLLBACK;
RAISE;
END;
/
No comments:
Post a Comment