Friday, June 12, 2026

PG_DROP

 

-- =============================================================================

SET SERVEROUTPUT ON SIZE UNLIMITED;

/*
 * Script  : kill_then_disconnect_sessions.sql
 * Purpose : Two-phase session termination for a target user:
 *           Phase 1 - Issues ALTER SYSTEM KILL SESSION IMMEDIATE.
 *           Phase 2 - Only if Phase 1 succeeds and session is confirmed
 *                     KILLED, issues ALTER SYSTEM DISCONNECT SESSION
 *                     IMMEDIATE to destroy the server process.
 *           If Phase 2 fails, the OS Process ID (SPID) is reported
 *           for escalation to AWS Support.
 * Usage   : Set l_target_user to the schema username before executing.
 * Notes   : Requires ALTER SYSTEM privilege.
 *           Excludes background processes and the current session.
 *           Target sessions must NOT already be in KILLED status;
 *           this script handles active/inactive sessions and drives
 *           them through the full termination lifecycle.
 */

DECLARE

   -- ----------------------------------------------------------------
   -- Configuration
   -- ----------------------------------------------------------------
   l_target_user    VARCHAR2(30)  := 'YOUR_TARGET_USERNAME';  -- << CHANGE THIS

   -- ----------------------------------------------------------------
   -- Local variables
   -- ----------------------------------------------------------------
   l_killed_status  v\\)session.status%TYPE;
   l_spid           v\\(process.spid%TYPE;
   l_kill_success   BOOLEAN;
   l_session_count  PLS_INTEGER   := 0;

   -- ----------------------------------------------------------------
   -- Cursor: Selects active/inactive sessions for the target user.
   --         Excludes sessions already KILLED, background processes,
   --         and the current session running this script.
   -- ----------------------------------------------------------------
   CURSOR c_target_sessions IS
      SELECT sid,
             serial#,
             status,
             module
        FROM v\\)session
       WHERE username = UPPER(l_target_user)
         AND status  != 'KILLED'
         AND type    != 'BACKGROUND'
         AND sid     != SYS_CONTEXT('USERENV', 'SID');

BEGIN

   DBMS_OUTPUT.PUT_LINE(
      'Starting two-phase termination for user: ' || l_target_user
   );
   DBMS_OUTPUT.PUT_LINE(
      '=================================================='
   );

   FOR r IN c_target_sessions
   LOOP

      l_session_count  := l_session_count + 1;
      l_kill_success   := FALSE;

      DBMS_OUTPUT.PUT_LINE(
         'Processing Session  => SID: '    || r.sid
         || '  |  Serial#: ' || r.serial#
         || '  |  Status: '  || r.status
      );
      DBMS_OUTPUT.PUT_LINE(
         '   Module          => ' || NVL(r.module, 'N/A')
      );

      -- -------------------------------------------------------
      -- PHASE 1: Kill the session
      -- -------------------------------------------------------
      BEGIN

         EXECUTE IMMEDIATE
            'ALTER SYSTEM KILL SESSION '''
            || r.sid     || ','
            || r.serial#
            || ''' IMMEDIATE';

         l_kill_success := TRUE;

         DBMS_OUTPUT.PUT_LINE(
            '   [PHASE 1 - SUCCESS] KILL issued for session ' || r.sid || '.'
         );

      EXCEPTION
         WHEN OTHERS
         THEN
            DBMS_OUTPUT.PUT_LINE(
               '   [PHASE 1 - FAILED]  Could not kill session '
               || r.sid || ': ' || SQLERRM
            );
            DBMS_OUTPUT.PUT_LINE(
               '   [DETAIL]  ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE
            );
      END;

      -- -------------------------------------------------------
      -- PHASE 2: Only proceeds if Phase 1 succeeded.
      --          Confirm session is now in KILLED status,
      --          then force-disconnect the server process.
      -- -------------------------------------------------------
      IF l_kill_success
      THEN

         -- Confirm KILLED status before attempting disconnect
         BEGIN

            SELECT status
              INTO l_killed_status
              FROM v\\(session
             WHERE sid     = r.sid
               AND serial# = r.serial#;

         EXCEPTION
            WHEN NO_DATA_FOUND
            THEN
               -- Session already cleaned up by Oracle; nothing more to do
               l_killed_status := 'GONE';
         END;

         IF l_killed_status = 'KILLED'
         THEN

            DBMS_OUTPUT.PUT_LINE(
               '   [PHASE 2 - INFO]    Session ' || r.sid
               || ' confirmed KILLED. Proceeding to disconnect...'
            );

            BEGIN

               EXECUTE IMMEDIATE
                  'ALTER SYSTEM DISCONNECT SESSION '''
                  || r.sid     || ','
                  || r.serial#
                  || ''' IMMEDIATE';

               DBMS_OUTPUT.PUT_LINE(
                  '   [PHASE 2 - SUCCESS] Session ' || r.sid
                  || ' server process disconnected immediately.'
               );

            EXCEPTION
               WHEN OTHERS
               THEN
                  DBMS_OUTPUT.PUT_LINE(
                     '   [PHASE 2 - FAILED]  Could not disconnect session '
                     || r.sid || ': ' || SQLERRM
                  );
                  DBMS_OUTPUT.PUT_LINE(
                     '   [DETAIL]  ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE
                  );

                  -- Retrieve OS Process ID for AWS Support escalation
                  BEGIN

                     SELECT p.spid
                       INTO l_spid
                       FROM v\\)process p
                       JOIN v\\(session s
                         ON p.addr = s.paddr
                      WHERE s.sid = r.sid;

                     DBMS_OUTPUT.PUT_LINE(
                        '   [AWS SUPPORT]       OS Process ID (SPID): '
                        || l_spid
                        || '  Provide this to AWS Support for OS-level kill.'
                     );

                  EXCEPTION
                     WHEN NO_DATA_FOUND
                     THEN
                        DBMS_OUTPUT.PUT_LINE(
                           '   [AWS SUPPORT]       Could not retrieve SPID; '
                           || 'process may have already terminated.'
                        );
                  END;

            END;

         ELSIF l_killed_status = 'GONE'
         THEN
            DBMS_OUTPUT.PUT_LINE(
               '   [PHASE 2 - INFO]    Session ' || r.sid
               || ' no longer exists; Oracle already cleaned it up.'
            );
         ELSE
            DBMS_OUTPUT.PUT_LINE(
               '   [PHASE 2 - WARN]    Session ' || r.sid
               || ' is in status [' || l_killed_status
               || '] after kill; skipping disconnect.'
            );
         END IF;

      END IF;  -- l_kill_success

      DBMS_OUTPUT.PUT_LINE(
         '--------------------------------------------------'
      );

   END LOOP;

   -- Summary
   IF l_session_count = 0
   THEN
      DBMS_OUTPUT.PUT_LINE(
         'No eligible sessions found for user: ' || l_target_user
      );
   ELSE
      DBMS_OUTPUT.PUT_LINE(
         'Process complete. Total sessions processed: ' || l_session_count
      );
   END IF;

END;
/

Good question. Before you can feed tables into `PDROP_BG_PKG.ADD_TARGET`, you need to identify **which tables are actually partitioned**, what type they are, and whether they are the right kind (RANGE by DATE/TIMESTAMP) that the package supports. Here is a full step-by-step scouting guide.

---

# Step 2.1: Identifying Partitioned Tables in a Schema

## The Key Data Dictionary Views

`ALL_PART_TABLES` displays object-level partitioning information for all partitioned tables accessible to the current user. There are three scope levels:

| View | Scope |
|------|-------|
| `USER_PART_TABLES` | Only tables you own |
| `ALL_PART_TABLES` | Tables you can access |
| `DBA_PART_TABLES` | Every table in the database (DBA privilege required) |

`NUM_ROWS` and stats-related columns are populated only if you collect statistics on the table using the `DBMS_STATS` package.

---

## Query 1: Full List of Partitioned Tables in a Schema

This is your **starting point**. Run this to see all partitioned tables with their partition type and count:

```sql
SELECT
    pt.owner,
    pt.table_name,
    pt.partitioning_type,        -- RANGE, LIST, HASH, INTERVAL, etc.
    pt.subpartitioning_type,     -- NONE, HASH, LIST, RANGE
    pt.partition_count,
    pt.def_subpartition_count,
    t.num_rows                   -- from all_tables (global stats)
FROM all_part_tables  pt
JOIN all_tables       t
  ON t.owner      = pt.owner
 AND t.table_name = pt.table_name
WHERE pt.owner = UPPER('YOUR_SCHEMA_HERE')
ORDER BY pt.table_name;
```

---

## Query 2: Filter Only RANGE-Partitioned Tables (What the Package Needs)

The `PDROP_BG_PKG` only processes **RANGE** partitioned tables with a **single DATE/TIMESTAMP key column**. This query narrows down only those:

```sql
SELECT
    pk.owner,
    pk.name           AS table_name,
    pk.column_name    AS partition_key_col,
    c.data_type       AS partition_key_type,
    pt.partitioning_type,
    pt.interval,                -- NOT NULL means it is INTERVAL type
    pt.partition_count
FROM all_part_key_columns  pk
JOIN all_tab_cols           c
  ON c.owner       = pk.owner
 AND c.table_name  = pk.name
 AND c.column_name = pk.column_name
JOIN all_part_tables        pt
  ON pt.owner      = pk.owner
 AND pt.table_name = pk.name
WHERE pk.owner       = UPPER('YOUR_SCHEMA_HERE')
  AND pk.object_type = 'TABLE'
  AND pt.partitioning_type IN ('RANGE', 'INTERVAL')
  AND c.data_type IN ('DATE', 'TIMESTAMP',
                      'TIMESTAMP(3)', 'TIMESTAMP(6)',
                      'TIMESTAMP WITH TIME ZONE',
                      'TIMESTAMP WITH LOCAL TIME ZONE')
ORDER BY pk.owner, pk.name;
```

> This mirrors exactly the logic inside `table_has_single_date_key()` in your package. If a table appears here, it is a valid candidate.

---

## Query 3: Tables with MULTIPLE Partition Keys (Disqualified by the Package)

These will be caught by `SKIP-KEY-NOT-DATE`. Identify them early so you do not add them as targets:

```sql
SELECT
    owner,
    name     AS table_name,
    COUNT(*) AS key_col_count
FROM all_part_key_columns
WHERE owner       = UPPER('YOUR_SCHEMA_HERE')
  AND object_type = 'TABLE'
GROUP BY owner, name
HAVING COUNT(*) > 1
ORDER BY name;
```

---

## Query 4: Partition Detail Per Table (Zero-Row Candidates Preview)

This is the **core of Step 2.1** from the package's perspective. It shows partitions with `NUM_ROWS = 0` per dictionary stats WITHOUT running the package yet:

```sql
SELECT
    tp.table_owner,
    tp.table_name,
    tp.partition_name,
    tp.partition_position,
    tp.num_rows,
    tp.last_analyzed,
    tp.high_value,             -- LONG column, may display truncated in some clients
    pt.partitioning_type,
    pt.interval
FROM all_tab_partitions  tp
JOIN all_part_tables     pt
  ON pt.owner      = tp.table_owner
 AND pt.table_name = tp.table_name
WHERE tp.table_owner = UPPER('YOUR_SCHEMA_HERE')
  AND tp.num_rows    = 0
  AND tp.last_analyzed IS NOT NULL    -- only trust analyzed partitions
ORDER BY tp.table_name, tp.partition_position;
```

> The `last_analyzed IS NOT NULL` guard is critical: `NUM_ROWS = 0` on a **never-analyzed** partition means nothing. The package has a `SKIP-NOSTATS` guard for exactly this reason.

---

## Query 5: Summary Roll-Up Before Adding Targets

Use this to get a **count summary per table** before deciding which tables to register:

```sql
SELECT
    tp.table_owner,
    tp.table_name,
    pt.partitioning_type,
    pt.interval,
    COUNT(*)                                                  AS total_partitions,
    SUM(CASE WHEN tp.num_rows = 0
              AND tp.last_analyzed IS NOT NULL
             THEN 1 ELSE 0 END)                               AS zero_row_partitions,
    SUM(CASE WHEN tp.last_analyzed IS NULL
             THEN 1 ELSE 0 END)                               AS unanalyzed_partitions,
    ROUND(SUM(s.bytes) / 1024 / 1024, 2)                      AS total_segment_mb
FROM all_tab_partitions   tp
JOIN all_part_tables      pt
  ON pt.owner      = tp.table_owner
 AND pt.table_name = tp.table_name
LEFT JOIN all_segments    s
  ON s.owner        = tp.table_owner
 AND s.segment_name = tp.table_name
 AND s.segment_type IN ('TABLE', 'TABLE PARTITION', 'TABLE SUBPARTITION')
WHERE tp.table_owner = UPPER('YOUR_SCHEMA_HERE')
GROUP BY tp.table_owner, tp.table_name, pt.partitioning_type, pt.interval
ORDER BY zero_row_partitions DESC, total_segment_mb DESC;
```

This gives you a prioritized list: tables with the most zero-row partitions and the largest segment size are the best candidates to register first.

---

## Recommended Workflow

```
Step 1: Run Query 1  →  See ALL partitioned tables in the schema
Step 2: Run Query 3  →  Remove multi-key tables from your shortlist
Step 3: Run Query 2  →  Confirm single DATE/TIMESTAMP RANGE tables only
Step 4: Run Query 5  →  Rank by zero-row partition count and MB
Step 5: Run Query 4  →  Inspect specific partitions before adding targets
Step 6: Register via PDROP_BG_PKG.ADD_TARGET for valid candidates only
```

---

## Quick Note on `HIGH_VALUE` in SQL Clients

The `HIGH_VALUE` column behaves differently for simple vs. composite partitioned tables: in a simple partitioned table the partition physically contains the data, while in a composite partitioned table the partition is metadata and data is stored in subpartitions. In most SQL clients (SQL Developer, SQLcl), the `HIGH_VALUE` `LONG` column will display truncated or require special handling. If it shows blank or cut off, use this workaround:

```sql
-- Workaround to display HIGH_VALUE as VARCHAR2 in clients
SELECT
    partition_name,
    partition_position,
    num_rows,
    last_analyzed,
    DBMS_LOB.SUBSTR(
        TO_LOB(high_value), 200, 1
    ) AS high_value_text
FROM all_tab_partitions
WHERE table_owner = UPPER('YOUR_SCHEMA_HERE')
  AND table_name  = UPPER('YOUR_TABLE_HERE')
ORDER BY partition_position;
```

> Replace `YOUR_SCHEMA_HERE` and `YOUR_TABLE_HERE` with actual values. The `TO_LOB` + `DBMS_LOB.SUBSTR` pattern avoids the `LONG` display limitation without needing a PL/SQL block.


--------------------------------------------------------------------------------------

--Step 2.1: Report Partitions with Zero Rows per Dictionary Stats

-- Report all partitions where stats say 0 rows or stats are missing

-- Owner: change 'YOUR_SCHEMA' to your schema name

SELECT tp.table_owner,

    tp.table_name,

    tp.partition_name,

    tp.partition_position,

    tp.high_value,

    tp.num_rows,

    tp.blocks,

    tp.last_analyzed,

    CASE

        WHEN tp.last_analyzed IS NULL THEN 'NO STATS'

        WHEN tp.num_rows IS NULL      THEN 'NO STATS'

        WHEN tp.num_rows = 0          THEN 'ZERO ROWS (per stats)'

        ELSE 'HAS ROWS'

    END AS stats_verdict

FROM

    dba_tab_partitions tp

WHERE

    tp.table_owner = 'YOUR_SCHEMA'

    AND tp.table_name IN ('YOUR_TABLE1','YOUR_TABLE2')  -- scope it!

    AND (tp.num_rows = 0 OR tp.num_rows IS NULL OR tp.last_analyzed IS NULL)

ORDER BY

    tp.table_name, tp.partition_position;


--Step 2.2: Cross-Check Against Physical Segments

-- Cross-check: does the partition have a physical segment allocated?

SELECT

    tp.table_owner,

    tp.table_name,

    tp.partition_name,

    tp.num_rows          AS stats_num_rows,

    tp.blocks            AS stats_blocks,

    tp.last_analyzed,

    seg.bytes / 1024     AS segment_kb,

    seg.blocks           AS segment_blocks,

    CASE

        WHEN seg.segment_name IS NULL THEN 'NO SEGMENT (deferred/never created)'

        WHEN seg.blocks <= 8          THEN 'SEGMENT EXISTS BUT TINY (likely empty)'

        ELSE 'SEGMENT EXISTS WITH BLOCKS'

    END AS segment_verdict

FROM

    dba_tab_partitions tp

LEFT JOIN

    dba_segments seg

    ON  seg.owner          = tp.table_owner

    AND seg.segment_name   = tp.table_name

    AND seg.partition_name = tp.partition_name

    AND seg.segment_type   = 'TABLE PARTITION'

WHERE

    tp.table_owner = 'YOUR_SCHEMA'

    AND tp.table_name IN ('YOUR_TABLE1','YOUR_TABLE2')

    AND (tp.num_rows = 0 OR tp.num_rows IS NULL OR tp.last_analyzed IS NULL)

ORDER BY

    tp.table_name, tp.partition_position;


--Step 2.3: Generate and Run Direct COUNT Verification

-- Generate direct COUNT(*) statements for suspect partitions that have segments
SELECT
    'SELECT ''' || tp.table_owner || ''', ''' || tp.table_name
    || ''', ''' || tp.partition_name
    || ''', COUNT(*) AS actual_rows FROM '
    || tp.table_owner || '.' || tp.table_name
    || ' PARTITION (' || tp.partition_name || ');'   AS verify_sql
FROM
    dba_tab_partitions tp
JOIN
    dba_segments seg
    ON  seg.owner          = tp.table_owner
    AND seg.segment_name   = tp.table_name
    AND seg.partition_name = tp.partition_name
    AND seg.segment_type   = 'TABLE PARTITION'
WHERE
    tp.table_owner = 'YOUR_SCHEMA'
    AND tp.table_name IN ('YOUR_TABLE1','YOUR_TABLE2')
    AND (tp.num_rows = 0 OR tp.num_rows IS NULL OR tp.last_analyzed IS NULL)
ORDER BY
    tp.table_name, tp.partition_position;

--Step 2.4: Check for Global Indexes on the Target Tables

-- Identify global indexes on your target tables (highest risk)
SELECT
    i.owner,
    i.index_name,
    i.table_name,
    i.partitioned,
    i.status,
    CASE i.partitioned
        WHEN 'NO'  THEN 'GLOBAL NON-PARTITIONED (will be affected)'
        WHEN 'YES' THEN 'CHECK GLOBAL PARTITIONED INDEXES SEPARATELY'
    END AS risk_note
FROM
    dba_indexes i
WHERE
    i.table_owner = 'YOUR_SCHEMA'
    AND i.table_name IN ('YOUR_TABLE1','YOUR_TABLE2')
ORDER BY
    i.table_name, i.partitioned;



CREATE OR REPLACE PACKAGE BODY PDROP_BG_PKG AS

    --------------------------------------------------------------------------
    -- Helper: Y/N flag
    --------------------------------------------------------------------------
    FUNCTION is_yes(p_value IN VARCHAR2) RETURN BOOLEAN IS
    BEGIN
        RETURN UPPER(SUBSTR(NVL(p_value, 'N'), 1, 1)) = 'Y';
    END is_yes;

    --------------------------------------------------------------------------
    -- Helper: normalize owner/table names
    --------------------------------------------------------------------------
    FUNCTION normalize_name(p_name IN VARCHAR2) RETURN VARCHAR2 IS
    BEGIN
        RETURN UPPER(TRIM(p_name));
    END normalize_name;

    --------------------------------------------------------------------------
    -- Helper: double-quote object names safely
    --------------------------------------------------------------------------
    FUNCTION qname(p_name IN VARCHAR2) RETURN VARCHAR2 IS
    BEGIN
        RETURN '"' || REPLACE(p_name, '"', '""') || '"';
    END qname;

    --------------------------------------------------------------------------
    -- Segment MB for table and table partitions/subpartitions
    --------------------------------------------------------------------------
    FUNCTION get_segment_mb (
        p_owner IN VARCHAR2,
        p_table IN VARCHAR2
    ) RETURN NUMBER IS
        l_mb NUMBER;
    BEGIN
        SELECT ROUND(NVL(SUM(bytes), 0) / 1024 / 1024, 2)
        INTO l_mb
        FROM all_segments
        WHERE owner = p_owner
          AND segment_name = p_table
          AND segment_type IN (
                'TABLE',
                'TABLE PARTITION',
                'TABLE SUBPARTITION'
              );

        RETURN l_mb;
    EXCEPTION
        WHEN OTHERS THEN
            RETURN NULL;
    END get_segment_mb;

    --------------------------------------------------------------------------
    -- Partition count for target table
    --------------------------------------------------------------------------
    FUNCTION get_partition_count (
        p_owner IN VARCHAR2,
        p_table IN VARCHAR2
    ) RETURN NUMBER IS
        l_cnt NUMBER;
    BEGIN
        SELECT COUNT(*)
        INTO l_cnt
        FROM all_tab_partitions
        WHERE table_owner = p_owner
          AND table_name  = p_table;

        RETURN l_cnt;
    EXCEPTION
        WHEN OTHERS THEN
            RETURN NULL;
    END get_partition_count;

    --------------------------------------------------------------------------
    -- Count unusable global indexes for a target table
    --------------------------------------------------------------------------
    FUNCTION get_unusable_global_index_count (
        p_owner IN VARCHAR2,
        p_table IN VARCHAR2
    ) RETURN NUMBER IS
        l_cnt NUMBER;
    BEGIN
        SELECT
            (
                SELECT COUNT(*)
                FROM all_indexes i
                WHERE i.table_owner = p_owner
                  AND i.table_name  = p_table
                  AND i.partitioned = 'NO'
                  AND i.status      = 'UNUSABLE'
            )
            +
            (
                SELECT COUNT(*)
                FROM all_ind_partitions ip
                JOIN all_indexes i
                  ON i.owner      = ip.index_owner
                 AND i.index_name = ip.index_name
                JOIN all_part_indexes pi
                  ON pi.owner      = i.owner
                 AND pi.index_name = i.index_name
                WHERE i.table_owner = p_owner
                  AND i.table_name  = p_table
                  AND pi.locality   = 'GLOBAL'
                  AND ip.status     = 'UNUSABLE'
            )
        INTO l_cnt
        FROM dual;

        RETURN l_cnt;
    EXCEPTION
        WHEN OTHERS THEN
            RETURN NULL;
    END get_unusable_global_index_count;

    --------------------------------------------------------------------------
    -- Validate table is single-column DATE/TIMESTAMP partitioned.
    -- NOTE:
    --   This is used as a warning guard only in run_cleanup.
    --   The package still relies on HIGH_VALUE date parsing before any DROP.
    --------------------------------------------------------------------------
    FUNCTION table_has_single_date_key (
        p_owner IN VARCHAR2,
        p_table IN VARCHAR2
    ) RETURN BOOLEAN IS
        l_key_count NUMBER;
        l_data_type VARCHAR2(128);
    BEGIN
        SELECT COUNT(*), MAX(c.data_type)
        INTO l_key_count, l_data_type
        FROM all_part_key_columns pk
        JOIN all_tab_cols c
          ON c.owner       = pk.owner
         AND c.table_name  = pk.name
         AND c.column_name = pk.column_name
        WHERE pk.owner       = p_owner
          AND pk.name        = p_table
          AND pk.object_type = 'TABLE';

        RETURN (
            l_key_count = 1
            AND (
                l_data_type = 'DATE'
                OR l_data_type LIKE 'TIMESTAMP%'
            )
        );
    EXCEPTION
        WHEN OTHERS THEN
            RETURN FALSE;
    END table_has_single_date_key;

    --------------------------------------------------------------------------
    -- Convert partition HIGH_VALUE text to DATE
    --------------------------------------------------------------------------
    FUNCTION get_hv_date(p_hv_text IN VARCHAR2) RETURN DATE IS
        l_d         DATE;
        l_date_text VARCHAR2(100);
    BEGIN
        IF p_hv_text IS NULL OR UPPER(p_hv_text) LIKE '%MAXVALUE%' THEN
            RETURN NULL;
        END IF;

        -- Attempt 1: direct CAST from Oracle-generated HIGH_VALUE expression
        BEGIN
            EXECUTE IMMEDIATE
                'SELECT CAST(' || p_hv_text || ' AS DATE) FROM dual'
            INTO l_d;

            RETURN l_d;
        EXCEPTION
            WHEN OTHERS THEN
                NULL;
        END;

        -- Attempt 2: TIMESTAMP fallback
        BEGIN
            EXECUTE IMMEDIATE
                'SELECT TRUNC(CAST(' || p_hv_text || ' AS TIMESTAMP)) FROM dual'
            INTO l_d;

            RETURN l_d;
        EXCEPTION
            WHEN OTHERS THEN
                NULL;
        END;

        -- Attempt 3: regex fallback for common DATE text
        l_date_text := REGEXP_SUBSTR(
                           p_hv_text,
                           '[0-9]{4}-[0-9]{2}-[0-9]{2}'
                       );

        IF l_date_text IS NOT NULL THEN
            BEGIN
                RETURN TO_DATE(l_date_text, 'YYYY-MM-DD');
            EXCEPTION
                WHEN OTHERS THEN
                    RETURN NULL;
            END;
        END IF;

        RETURN NULL;
    END get_hv_date;

    --------------------------------------------------------------------------
    -- Detail log
    --------------------------------------------------------------------------
    PROCEDURE log_detail (
        p_run_id             IN VARCHAR2,
        p_owner              IN VARCHAR2,
        p_table              IN VARCHAR2,
        p_partition          IN VARCHAR2,
        p_partition_position IN NUMBER,
        p_max_partition_pos  IN NUMBER,
        p_high_value_text    IN VARCHAR2,
        p_high_value_date    IN DATE,
        p_stats_num_rows     IN NUMBER,
        p_last_analyzed      IN DATE,
        p_live_count         IN NUMBER,
        p_action             IN VARCHAR2,
        p_sql_text           IN CLOB,
        p_error_msg          IN VARCHAR2 DEFAULT NULL
    ) IS
        PRAGMA AUTONOMOUS_TRANSACTION;
    BEGIN
        INSERT INTO PDROP_BG_DETAIL (
            run_id,
            source_owner,
            table_name,
            partition_name,
            partition_position,
            max_partition_pos,
            high_value_text,
            high_value_date,
            stats_num_rows,
            last_analyzed,
            live_count,
            action,
            sql_text,
            error_msg
        )
        VALUES (
            p_run_id,
            p_owner,
            p_table,
            p_partition,
            p_partition_position,
            p_max_partition_pos,
            p_high_value_text,
            p_high_value_date,
            p_stats_num_rows,
            p_last_analyzed,
            p_live_count,
            p_action,
            p_sql_text,
            p_error_msg
        );

        COMMIT;
    EXCEPTION
        WHEN OTHERS THEN
            NULL;
    END log_detail;

    --------------------------------------------------------------------------
    -- Index log
    --------------------------------------------------------------------------
    PROCEDURE log_index (
        p_run_id      IN VARCHAR2,
        p_index_owner IN VARCHAR2,
        p_index_name  IN VARCHAR2,
        p_partition   IN VARCHAR2,
        p_table_owner IN VARCHAR2,
        p_table_name  IN VARCHAR2,
        p_action      IN VARCHAR2,
        p_sql_text    IN CLOB,
        p_error_msg   IN VARCHAR2 DEFAULT NULL
    ) IS
        PRAGMA AUTONOMOUS_TRANSACTION;
    BEGIN
        INSERT INTO PDROP_BG_INDEX_LOG (
            run_id,
            index_owner,
            index_name,
            partition_name,
            table_owner,
            table_name,
            action,
            sql_text,
            error_msg
        )
        VALUES (
            p_run_id,
            p_index_owner,
            p_index_name,
            p_partition,
            p_table_owner,
            p_table_name,
            p_action,
            p_sql_text,
            p_error_msg
        );

        COMMIT;
    EXCEPTION
        WHEN OTHERS THEN
            NULL;
    END log_index;

    --------------------------------------------------------------------------
    -- Clear target list
    --------------------------------------------------------------------------
    PROCEDURE clear_targets (
        p_owner IN VARCHAR2 DEFAULT NULL
    ) IS
        l_owner VARCHAR2(128);
    BEGIN
        IF p_owner IS NULL THEN
            DELETE FROM PDROP_BG_TARGETS;
        ELSE
            l_owner := normalize_name(p_owner);

            DELETE FROM PDROP_BG_TARGETS
            WHERE source_owner = l_owner;
        END IF;

        COMMIT;
    END clear_targets;

    --------------------------------------------------------------------------
    -- Add target table
    --------------------------------------------------------------------------
    PROCEDURE add_target (
        p_owner        IN VARCHAR2,
        p_table_name   IN VARCHAR2,
        p_enabled_flag IN VARCHAR2 DEFAULT 'Y',
        p_notes        IN VARCHAR2 DEFAULT NULL
    ) IS
        l_owner      VARCHAR2(128);
        l_table_name VARCHAR2(128);
    BEGIN
        l_owner      := normalize_name(p_owner);
        l_table_name := normalize_name(p_table_name);

        MERGE INTO PDROP_BG_TARGETS t
        USING (
            SELECT
                l_owner      AS source_owner,
                l_table_name AS table_name
            FROM dual
        ) s
        ON (
            t.source_owner = s.source_owner
            AND t.table_name = s.table_name
        )
        WHEN MATCHED THEN
            UPDATE SET
                t.enabled_flag = UPPER(SUBSTR(NVL(p_enabled_flag, 'Y'), 1, 1)),
                t.notes        = p_notes,
                t.updated_at   = SYSTIMESTAMP
        WHEN NOT MATCHED THEN
            INSERT (
                source_owner,
                table_name,
                enabled_flag,
                notes
            )
            VALUES (
                s.source_owner,
                s.table_name,
                UPPER(SUBSTR(NVL(p_enabled_flag, 'Y'), 1, 1)),
                p_notes
            );

        COMMIT;
    END add_target;

    --------------------------------------------------------------------------
    -- Create run
    --------------------------------------------------------------------------
    PROCEDURE create_run (
        p_run_mode                 IN VARCHAR2 DEFAULT 'DRY_RUN',
        p_cutoff_date              IN DATE     DEFAULT DATE '2025-01-01',
        p_update_global_indexes    IN VARCHAR2 DEFAULT 'Y',
        p_gather_stats             IN VARCHAR2 DEFAULT 'Y',
        p_rebuild_unusable_indexes IN VARCHAR2 DEFAULT 'Y',
        p_parallel_degree          IN NUMBER   DEFAULT 8,
        p_log_skips                IN VARCHAR2 DEFAULT 'N',
        p_run_id                   OUT VARCHAR2
    ) IS
        l_seq       NUMBER;
        l_run_id    VARCHAR2(100);
        l_job_name  VARCHAR2(128);
        l_targets   NUMBER;
        l_run_mode  VARCHAR2(20) := UPPER(TRIM(p_run_mode));
    BEGIN
        IF l_run_mode NOT IN ('DRY_RUN', 'EXECUTE') THEN
            RAISE_APPLICATION_ERROR(-20001, 'Run mode must be DRY_RUN or EXECUTE.');
        END IF;

        SELECT COUNT(*)
        INTO l_targets
        FROM PDROP_BG_TARGETS
        WHERE enabled_flag = 'Y';

        IF l_targets = 0 THEN
            RAISE_APPLICATION_ERROR(-20002, 'No enabled targets found in PDROP_BG_TARGETS.');
        END IF;

        SELECT PDROP_BG_SEQ.NEXTVAL
        INTO l_seq
        FROM dual;

        l_run_id   := 'PDROP_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDD_HH24MISS_') || l_seq;
        l_job_name := 'PDROPJ_' || l_seq;

        INSERT INTO PDROP_BG_RUNS (
            run_id,
            scheduler_job_name,
            run_mode,
            cutoff_date,
            update_global_indexes,
            gather_stats,
            rebuild_unusable_indexes,
            parallel_degree,
            log_skips,
            status,
            created_by,
            total_targets,
            total_partitions_checked,
            candidate_count,
            dropped_count,
            error_count,
            warning_count,
            notes
        )
        VALUES (
            l_run_id,
            l_job_name,
            l_run_mode,
            p_cutoff_date,
            UPPER(SUBSTR(NVL(p_update_global_indexes, 'Y'), 1, 1)),
            UPPER(SUBSTR(NVL(p_gather_stats, 'Y'), 1, 1)),
            UPPER(SUBSTR(NVL(p_rebuild_unusable_indexes, 'Y'), 1, 1)),
            p_parallel_degree,
            UPPER(SUBSTR(NVL(p_log_skips, 'N'), 1, 1)),
            'READY',
            SYS_CONTEXT('USERENV', 'SESSION_USER'),
            l_targets,
            0,
            0,
            0,
            0,
            0,
            CASE
                WHEN l_run_mode = 'EXECUTE'
                 AND UPPER(SUBSTR(NVL(p_update_global_indexes, 'Y'), 1, 1)) = 'N'
                THEN 'EXECUTE without UPDATE GLOBAL INDEXES. Scheduler rebuild fallback should remain enabled.'
                ELSE NULL
            END
        );

        INSERT INTO PDROP_BG_RUN_TARGETS (
            run_id,
            source_owner,
            table_name,
            status,
            candidate_count,
            dropped_count,
            error_count
        )
        SELECT
            l_run_id,
            source_owner,
            table_name,
            'PENDING',
            0,
            0,
            0
        FROM PDROP_BG_TARGETS
        WHERE enabled_flag = 'Y';

        COMMIT;

        p_run_id := l_run_id;
    END create_run;

    --------------------------------------------------------------------------
    -- Capture before checks.
    -- Guarded by before_time IS NULL to avoid duplicate baseline overwrite.
    --------------------------------------------------------------------------
    PROCEDURE capture_before (
        p_run_id IN VARCHAR2
    ) IS
        l_partition_count NUMBER;
        l_segment_mb      NUMBER;
        l_unusable_count  NUMBER;
    BEGIN
        FOR t IN (
            SELECT source_owner, table_name
            FROM PDROP_BG_RUN_TARGETS
            WHERE run_id = p_run_id
              AND before_time IS NULL
            ORDER BY source_owner, table_name
        ) LOOP

            l_partition_count := get_partition_count(t.source_owner, t.table_name);
            l_segment_mb      := get_segment_mb(t.source_owner, t.table_name);
            l_unusable_count  := get_unusable_global_index_count(t.source_owner, t.table_name);

            UPDATE PDROP_BG_RUN_TARGETS
            SET before_time                 = SYSTIMESTAMP,
                before_partition_count      = l_partition_count,
                before_segment_mb           = l_segment_mb,
                before_unusable_index_count = l_unusable_count,
                status                      = CASE
                                                  WHEN status = 'PENDING'
                                                  THEN 'BEFORE_CAPTURED'
                                                  ELSE status
                                              END
            WHERE run_id       = p_run_id
              AND source_owner = t.source_owner
              AND table_name   = t.table_name;
        END LOOP;

        UPDATE PDROP_BG_RUNS
        SET status = CASE
                         WHEN status = 'READY'
                         THEN 'BEFORE_CAPTURED'
                         ELSE status
                     END
        WHERE run_id = p_run_id;

        COMMIT;
    END capture_before;

    --------------------------------------------------------------------------
    -- Rebuild unusable global indexes for one table
    --------------------------------------------------------------------------
    PROCEDURE rebuild_indexes_for_table (
        p_run_id          IN VARCHAR2,
        p_owner           IN VARCHAR2,
        p_table           IN VARCHAR2,
        p_parallel_degree IN NUMBER,
        p_rebuild_count   IN OUT NUMBER,
        p_table_errors    IN OUT NUMBER,
        p_total_errors    IN OUT NUMBER
    ) IS
        l_sql    VARCHAR2(32767);
        l_errmsg VARCHAR2(4000);
    BEGIN
        ----------------------------------------------------------------------
        -- Non-partitioned global indexes
        ----------------------------------------------------------------------
        FOR i IN (
            SELECT owner, index_name
            FROM all_indexes
            WHERE table_owner = p_owner
              AND table_name  = p_table
              AND partitioned = 'NO'
              AND status      = 'UNUSABLE'
            ORDER BY owner, index_name
        ) LOOP
            BEGIN
                l_sql :=
                    'ALTER INDEX ' || qname(i.owner) || '.' || qname(i.index_name) ||
                    ' REBUILD PARALLEL ' || p_parallel_degree;

                EXECUTE IMMEDIATE l_sql;

                BEGIN
                    EXECUTE IMMEDIATE
                        'ALTER INDEX ' || qname(i.owner) || '.' || qname(i.index_name) ||
                        ' NOPARALLEL';
                EXCEPTION
                    WHEN OTHERS THEN
                        NULL;
                END;

                p_rebuild_count := p_rebuild_count + 1;

                log_index(
                    p_run_id,
                    i.owner,
                    i.index_name,
                    NULL,
                    p_owner,
                    p_table,
                    'IDX-REBUILT',
                    l_sql,
                    NULL
                );

            EXCEPTION
                WHEN OTHERS THEN
                    l_errmsg := SQLERRM;
                    p_table_errors := p_table_errors + 1;
                    p_total_errors := p_total_errors + 1;

                    log_index(
                        p_run_id,
                        i.owner,
                        i.index_name,
                        NULL,
                        p_owner,
                        p_table,
                        'IDX-ERROR',
                        l_sql,
                        l_errmsg
                    );
            END;
        END LOOP;

        ----------------------------------------------------------------------
        -- Partitioned GLOBAL indexes only
        ----------------------------------------------------------------------
        FOR ip IN (
            SELECT
                ip.index_owner,
                ip.index_name,
                ip.partition_name
            FROM all_ind_partitions ip
            JOIN all_indexes i
              ON i.owner      = ip.index_owner
             AND i.index_name = ip.index_name
            JOIN all_part_indexes pi
              ON pi.owner      = i.owner
             AND pi.index_name = i.index_name
            WHERE i.table_owner = p_owner
              AND i.table_name  = p_table
              AND pi.locality   = 'GLOBAL'
              AND ip.status     = 'UNUSABLE'
            ORDER BY ip.index_owner, ip.index_name, ip.partition_name
        ) LOOP
            BEGIN
                l_sql :=
                    'ALTER INDEX ' || qname(ip.index_owner) || '.' || qname(ip.index_name) ||
                    ' REBUILD PARTITION ' || qname(ip.partition_name) ||
                    ' PARALLEL ' || p_parallel_degree;

                EXECUTE IMMEDIATE l_sql;

                BEGIN
                    EXECUTE IMMEDIATE
                        'ALTER INDEX ' || qname(ip.index_owner) || '.' || qname(ip.index_name) ||
                        ' NOPARALLEL';
                EXCEPTION
                    WHEN OTHERS THEN
                        NULL;
                END;

                p_rebuild_count := p_rebuild_count + 1;

                log_index(
                    p_run_id,
                    ip.index_owner,
                    ip.index_name,
                    ip.partition_name,
                    p_owner,
                    p_table,
                    'IDX-PART-REBUILT',
                    l_sql,
                    NULL
                );

            EXCEPTION
                WHEN OTHERS THEN
                    l_errmsg := SQLERRM;
                    p_table_errors := p_table_errors + 1;
                    p_total_errors := p_total_errors + 1;

                    log_index(
                        p_run_id,
                        ip.index_owner,
                        ip.index_name,
                        ip.partition_name,
                        p_owner,
                        p_table,
                        'IDX-PART-ERROR',
                        l_sql,
                        l_errmsg
                    );
            END;
        END LOOP;
    END rebuild_indexes_for_table;

    --------------------------------------------------------------------------
    -- Main cleanup procedure executed by scheduler
    --------------------------------------------------------------------------
    PROCEDURE run_cleanup (
        p_run_id IN VARCHAR2
    ) IS
        l_run                 PDROP_BG_RUNS%ROWTYPE;
        l_last_pos            NUMBER;
        l_hv_long             LONG;
        l_hv_text             VARCHAR2(4000);
        l_hv_date             DATE;
        l_live_count          NUMBER;
        l_sql                 VARCHAR2(32767);

        l_table_candidates    NUMBER;
        l_table_dropped       NUMBER;
        l_table_errors        NUMBER;

        l_total_partitions    NUMBER := 0;
        l_total_candidates    NUMBER := 0;
        l_total_dropped       NUMBER := 0;
        l_total_errors        NUMBER := 0;
        l_total_warnings      NUMBER := 0;
        l_index_rebuild_count NUMBER := 0;

        l_after_partition_count NUMBER;
        l_after_segment_mb      NUMBER;
        l_after_unusable_count  NUMBER;

        l_errmsg VARCHAR2(4000);
    BEGIN
        SELECT *
        INTO l_run
        FROM PDROP_BG_RUNS
        WHERE run_id = p_run_id;

        UPDATE PDROP_BG_RUNS
        SET status     = 'RUNNING',
            start_time = NVL(start_time, SYSTIMESTAMP)
        WHERE run_id = p_run_id;

        COMMIT;

        -- Safe even if already captured; capture_before only updates rows where before_time IS NULL.
        capture_before(p_run_id);

        FOR t IN (
            SELECT source_owner, table_name
            FROM PDROP_BG_RUN_TARGETS
            WHERE run_id = p_run_id
            ORDER BY source_owner, table_name
        ) LOOP
            l_table_candidates := 0;
            l_table_dropped    := 0;
            l_table_errors     := 0;

            UPDATE PDROP_BG_RUN_TARGETS
            SET status = 'RUNNING'
            WHERE run_id       = p_run_id
              AND source_owner = t.source_owner
              AND table_name   = t.table_name;

            COMMIT;

            ------------------------------------------------------------------
            -- Soft metadata warning only.
            -- Do not hard-skip here; HIGH_VALUE parse/date/count guards still protect drops.
            ------------------------------------------------------------------
            IF NOT table_has_single_date_key(t.source_owner, t.table_name) THEN
                l_total_warnings := l_total_warnings + 1;

                log_detail(
                    p_run_id,
                    t.source_owner,
                    t.table_name,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    'WARN-KEY-CHECK',
                    NULL,
                    'Partition key is not confirmed as single-column DATE/TIMESTAMP. Proceeding with HIGH_VALUE parse guard only.'
                );
            END IF;

            IF is_yes(l_run.gather_stats) THEN
                BEGIN
                    DBMS_STATS.GATHER_TABLE_STATS(
                        ownname          => t.source_owner,
                        tabname          => t.table_name,
                        estimate_percent => DBMS_STATS.AUTO_SAMPLE_SIZE,
                        cascade          => FALSE,
                        granularity      => 'ALL',
                        degree           => CASE
                                                WHEN NVL(l_run.parallel_degree, 0) > 0
                                                THEN l_run.parallel_degree
                                                ELSE DBMS_STATS.AUTO_DEGREE
                                            END,
                        no_invalidate    => DBMS_STATS.AUTO_INVALIDATE
                    );
                EXCEPTION
                    WHEN OTHERS THEN
                        l_errmsg := SQLERRM;
                        l_table_errors := l_table_errors + 1;
                        l_total_errors := l_total_errors + 1;

                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            NULL,
                            NULL,
                            NULL,
                            NULL,
                            NULL,
                            NULL,
                            NULL,
                            NULL,
                            'STATS-ERROR',
                            NULL,
                            l_errmsg
                        );
                END;
            END IF;

            SELECT MAX(partition_position)
            INTO l_last_pos
            FROM all_tab_partitions
            WHERE table_owner = t.source_owner
              AND table_name  = t.table_name;

            IF l_last_pos IS NULL THEN
                l_total_warnings := l_total_warnings + 1;

                log_detail(
                    p_run_id,
                    t.source_owner,
                    t.table_name,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    'SKIP-NO-PARTITIONS',
                    NULL,
                    'No partitions found in ALL_TAB_PARTITIONS for this target table.'
                );

                l_after_partition_count := get_partition_count(t.source_owner, t.table_name);
                l_after_segment_mb      := get_segment_mb(t.source_owner, t.table_name);
                l_after_unusable_count  := get_unusable_global_index_count(t.source_owner, t.table_name);

                UPDATE PDROP_BG_RUN_TARGETS
                SET status                     = 'SKIPPED',
                    after_time                 = SYSTIMESTAMP,
                    after_partition_count      = l_after_partition_count,
                    after_segment_mb           = l_after_segment_mb,
                    after_unusable_index_count = l_after_unusable_count,
                    candidate_count            = l_table_candidates,
                    dropped_count              = l_table_dropped,
                    error_count                = l_table_errors,
                    notes                      = 'Skipped: no partitions found.'
                WHERE run_id       = p_run_id
                  AND source_owner = t.source_owner
                  AND table_name   = t.table_name;

                UPDATE PDROP_BG_RUNS
                SET total_partitions_checked = l_total_partitions,
                    candidate_count          = l_total_candidates,
                    dropped_count            = l_total_dropped,
                    error_count              = l_total_errors,
                    warning_count            = l_total_warnings
                WHERE run_id = p_run_id;

                COMMIT;
                CONTINUE;
            END IF;

            FOR p IN (
                SELECT
                    partition_name,
                    partition_position,
                    num_rows,
                    last_analyzed,
                    high_value
                FROM all_tab_partitions
                WHERE table_owner = t.source_owner
                  AND table_name  = t.table_name
                ORDER BY partition_position
            ) LOOP
                l_total_partitions := l_total_partitions + 1;
                l_hv_long          := p.high_value;
                l_hv_text          := SUBSTR(l_hv_long, 1, 4000);
                l_hv_date          := NULL;
                l_live_count       := NULL;
                l_sql              := NULL;

                IF LENGTH(l_hv_text) >= 4000 THEN
                    l_total_warnings := l_total_warnings + 1;

                    log_detail(
                        p_run_id,
                        t.source_owner,
                        t.table_name,
                        p.partition_name,
                        p.partition_position,
                        l_last_pos,
                        l_hv_text,
                        NULL,
                        p.num_rows,
                        p.last_analyzed,
                        NULL,
                        'SKIP-HV-TRUNCATED',
                        NULL,
                        'HIGH_VALUE may be truncated at 4000 characters; manual review required.'
                    );

                    CONTINUE;
                END IF;

                IF p.partition_position = l_last_pos THEN
                    IF is_yes(l_run.log_skips) THEN
                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            NULL,
                            p.num_rows,
                            p.last_analyzed,
                            NULL,
                            'SKIP-LAST-PARTITION',
                            NULL,
                            NULL
                        );
                    END IF;
                    CONTINUE;
                END IF;

                IF UPPER(l_hv_text) LIKE '%MAXVALUE%' THEN
                    IF is_yes(l_run.log_skips) THEN
                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            NULL,
                            p.num_rows,
                            p.last_analyzed,
                            NULL,
                            'SKIP-MAXVALUE',
                            NULL,
                            NULL
                        );
                    END IF;
                    CONTINUE;
                END IF;

                l_hv_date := get_hv_date(l_hv_text);

                IF l_hv_date IS NULL THEN
                    l_total_warnings := l_total_warnings + 1;

                    log_detail(
                        p_run_id,
                        t.source_owner,
                        t.table_name,
                        p.partition_name,
                        p.partition_position,
                        l_last_pos,
                        l_hv_text,
                        NULL,
                        p.num_rows,
                        p.last_analyzed,
                        NULL,
                        'SKIP-HV-PARSE',
                        NULL,
                        NULL
                    );
                    CONTINUE;
                END IF;

                IF l_hv_date > l_run.cutoff_date THEN
                    IF is_yes(l_run.log_skips) THEN
                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            l_hv_date,
                            p.num_rows,
                            p.last_analyzed,
                            NULL,
                            'SKIP-FUTURE',
                            NULL,
                            NULL
                        );
                    END IF;
                    CONTINUE;
                END IF;

                ------------------------------------------------------------------
                -- Always log no-stats partition skips, even when log_skips = N.
                ------------------------------------------------------------------
                IF p.last_analyzed IS NULL THEN
                    l_total_warnings := l_total_warnings + 1;

                    log_detail(
                        p_run_id,
                        t.source_owner,
                        t.table_name,
                        p.partition_name,
                        p.partition_position,
                        l_last_pos,
                        l_hv_text,
                        l_hv_date,
                        p.num_rows,
                        p.last_analyzed,
                        NULL,
                        'SKIP-NOSTATS',
                        NULL,
                        'Partition has no statistics; skipped for safety.'
                    );

                    CONTINUE;
                END IF;

                IF NVL(p.num_rows, -1) <> 0 THEN
                    IF is_yes(l_run.log_skips) THEN
                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            l_hv_date,
                            p.num_rows,
                            p.last_analyzed,
                            NULL,
                            'SKIP-STATS-HASROWS',
                            NULL,
                            NULL
                        );
                    END IF;
                    CONTINUE;
                END IF;

                BEGIN
                    l_sql :=
                        'SELECT COUNT(*) FROM ' ||
                        qname(t.source_owner) || '.' || qname(t.table_name) ||
                        ' PARTITION (' || qname(p.partition_name) || ')';

                    EXECUTE IMMEDIATE l_sql INTO l_live_count;

                EXCEPTION
                    WHEN OTHERS THEN
                        l_errmsg := SQLERRM;
                        l_table_errors := l_table_errors + 1;
                        l_total_errors := l_total_errors + 1;

                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            l_hv_date,
                            p.num_rows,
                            p.last_analyzed,
                            NULL,
                            'COUNT-ERROR',
                            l_sql,
                            l_errmsg
                        );
                        CONTINUE;
                END;

                IF NVL(l_live_count, -1) <> 0 THEN
                    IF is_yes(l_run.log_skips) THEN
                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            l_hv_date,
                            p.num_rows,
                            p.last_analyzed,
                            l_live_count,
                            'SKIP-LIVE-HASROWS',
                            l_sql,
                            NULL
                        );
                    END IF;
                    CONTINUE;
                END IF;

                l_table_candidates := l_table_candidates + 1;
                l_total_candidates := l_total_candidates + 1;

                l_sql :=
                    'ALTER TABLE ' ||
                    qname(t.source_owner) || '.' || qname(t.table_name) ||
                    ' DROP PARTITION ' || qname(p.partition_name);

                IF is_yes(l_run.update_global_indexes) THEN
                    l_sql := l_sql || ' UPDATE GLOBAL INDEXES';
                END IF;

                IF l_run.run_mode = 'DRY_RUN' THEN
                    log_detail(
                        p_run_id,
                        t.source_owner,
                        t.table_name,
                        p.partition_name,
                        p.partition_position,
                        l_last_pos,
                        l_hv_text,
                        l_hv_date,
                        p.num_rows,
                        p.last_analyzed,
                        l_live_count,
                        'DRY-DROP',
                        l_sql,
                        NULL
                    );
                ELSE
                    BEGIN
                        EXECUTE IMMEDIATE l_sql;

                        l_table_dropped := l_table_dropped + 1;
                        l_total_dropped := l_total_dropped + 1;

                        log_detail(
                            p_run_id,
                            t.source_owner,
                            t.table_name,
                            p.partition_name,
                            p.partition_position,
                            l_last_pos,
                            l_hv_text,
                            l_hv_date,
                            p.num_rows,
                            p.last_analyzed,
                            l_live_count,
                            'DROPPED',
                            l_sql,
                            NULL
                        );

                    EXCEPTION
                        WHEN OTHERS THEN
                            l_errmsg := SQLERRM;
                            l_table_errors := l_table_errors + 1;
                            l_total_errors := l_total_errors + 1;

                            log_detail(
                                p_run_id,
                                t.source_owner,
                                t.table_name,
                                p.partition_name,
                                p.partition_position,
                                l_last_pos,
                                l_hv_text,
                                l_hv_date,
                                p.num_rows,
                                p.last_analyzed,
                                l_live_count,
                                'DROP-ERROR',
                                l_sql,
                                l_errmsg
                            );
                    END;
                END IF;

                UPDATE PDROP_BG_RUN_TARGETS
                SET candidate_count = l_table_candidates,
                    dropped_count   = l_table_dropped,
                    error_count     = l_table_errors
                WHERE run_id       = p_run_id
                  AND source_owner = t.source_owner
                  AND table_name   = t.table_name;

                UPDATE PDROP_BG_RUNS
                SET total_partitions_checked = l_total_partitions,
                    candidate_count          = l_total_candidates,
                    dropped_count            = l_total_dropped,
                    error_count              = l_total_errors,
                    warning_count            = l_total_warnings
                WHERE run_id = p_run_id;

                COMMIT;
            END LOOP;

            IF l_run.run_mode = 'EXECUTE'
               AND l_table_dropped > 0
               AND is_yes(l_run.rebuild_unusable_indexes)
            THEN
                rebuild_indexes_for_table(
                    p_run_id,
                    t.source_owner,
                    t.table_name,
                    l_run.parallel_degree,
                    l_index_rebuild_count,
                    l_table_errors,
                    l_total_errors
                );
            END IF;

            l_after_partition_count := get_partition_count(t.source_owner, t.table_name);
            l_after_segment_mb      := get_segment_mb(t.source_owner, t.table_name);
            l_after_unusable_count  := get_unusable_global_index_count(t.source_owner, t.table_name);

            UPDATE PDROP_BG_RUN_TARGETS
            SET status                      = CASE
                                                  WHEN l_table_errors > 0
                                                  THEN 'COMPLETED_WITH_ERRORS'
                                                  ELSE 'COMPLETED'
                                              END,
                after_time                  = SYSTIMESTAMP,
                after_partition_count       = l_after_partition_count,
                after_segment_mb            = l_after_segment_mb,
                after_unusable_index_count  = l_after_unusable_count,
                candidate_count             = l_table_candidates,
                dropped_count               = l_table_dropped,
                error_count                 = l_table_errors
            WHERE run_id       = p_run_id
              AND source_owner = t.source_owner
              AND table_name   = t.table_name;

            UPDATE PDROP_BG_RUNS
            SET total_partitions_checked = l_total_partitions,
                candidate_count          = l_total_candidates,
                dropped_count            = l_total_dropped,
                error_count              = l_total_errors,
                warning_count            = l_total_warnings
            WHERE run_id = p_run_id;

            COMMIT;
        END LOOP;

        UPDATE PDROP_BG_RUNS
        SET status      = CASE
                              WHEN l_total_errors > 0
                              THEN 'COMPLETED_WITH_ERRORS'
                              ELSE 'COMPLETED'
                          END,
            end_time    = SYSTIMESTAMP,
            total_partitions_checked = l_total_partitions,
            candidate_count          = l_total_candidates,
            dropped_count            = l_total_dropped,
            error_count              = l_total_errors,
            warning_count            = l_total_warnings,
            notes                    = CASE
                                           WHEN l_index_rebuild_count > 0
                                           THEN SUBSTR(
                                                   NVL(notes, '') ||
                                                   ' Index rebuild count=' || l_index_rebuild_count || '.',
                                                   1,
                                                   4000
                                                )
                                           ELSE notes
                                       END
        WHERE run_id = p_run_id;

        COMMIT;

    EXCEPTION
        WHEN OTHERS THEN
            l_errmsg := SQLERRM;

            UPDATE PDROP_BG_RUNS
            SET status      = 'FAILED',
                end_time    = SYSTIMESTAMP,
                notes       = SUBSTR(l_errmsg, 1, 4000),
                error_count = NVL(error_count, 0) + 1
            WHERE run_id = p_run_id;

            COMMIT;
            RAISE;
    END run_cleanup;

    --------------------------------------------------------------------------
    -- Create and enable scheduler job
    -- Includes job-exists retry safety and running-job protection.
    --------------------------------------------------------------------------
    PROCEDURE start_scheduler_job (
        p_run_id IN VARCHAR2
    ) IS
        l_job_name      VARCHAR2(128);
        l_status        VARCHAR2(50);
        l_running_count NUMBER;
        l_errmsg        VARCHAR2(4000);
    BEGIN
        SELECT scheduler_job_name, status
        INTO l_job_name, l_status
        FROM PDROP_BG_RUNS
        WHERE run_id = p_run_id;

        IF l_status = 'RUNNING' THEN
            RAISE_APPLICATION_ERROR(-20003, 'Run is already RUNNING: ' || p_run_id);
        END IF;

        SELECT COUNT(*)
        INTO l_running_count
        FROM user_scheduler_running_jobs
        WHERE job_name = UPPER(l_job_name);

        IF l_running_count > 0 THEN
            RAISE_APPLICATION_ERROR(-20004, 'Scheduler job is already running: ' || l_job_name);
        END IF;

        capture_before(p_run_id);

        BEGIN
            DBMS_SCHEDULER.DROP_JOB(
                job_name => l_job_name,
                force    => TRUE
            );
        EXCEPTION
            WHEN OTHERS THEN
                NULL;
        END;

        DBMS_SCHEDULER.CREATE_JOB(
            job_name        => l_job_name,
            job_type        => 'PLSQL_BLOCK',
            job_action      => 'BEGIN PDROP_BG_PKG.RUN_CLEANUP(''' || p_run_id || '''); END;',
            start_date      => SYSTIMESTAMP,
            enabled         => FALSE,
            auto_drop       => FALSE,
            comments        => 'Background partition cleanup for run_id ' || p_run_id
        );

        UPDATE PDROP_BG_RUNS
        SET status = 'SCHEDULER_CREATED'
        WHERE run_id = p_run_id;

        COMMIT;

        UPDATE PDROP_BG_RUNS
        SET status = 'SCHEDULER_ENABLED'
        WHERE run_id = p_run_id
          AND status NOT IN ('RUNNING', 'COMPLETED', 'COMPLETED_WITH_ERRORS', 'FAILED');

        COMMIT;

        BEGIN
            DBMS_SCHEDULER.ENABLE(l_job_name);
        EXCEPTION
            WHEN OTHERS THEN
                l_errmsg := SQLERRM;

                UPDATE PDROP_BG_RUNS
                SET status = 'SCHEDULER_ENABLE_FAILED',
                    notes  = SUBSTR(l_errmsg, 1, 4000)
                WHERE run_id = p_run_id;

                COMMIT;
                RAISE;
        END;
    END start_scheduler_job;

    --------------------------------------------------------------------------
    -- Stop scheduler job
    --------------------------------------------------------------------------
    PROCEDURE stop_scheduler_job (
        p_run_id IN VARCHAR2
    ) IS
        l_job_name VARCHAR2(128);
        l_errmsg   VARCHAR2(4000);
    BEGIN
        SELECT scheduler_job_name
        INTO l_job_name
        FROM PDROP_BG_RUNS
        WHERE run_id = p_run_id;

        BEGIN
            DBMS_SCHEDULER.STOP_JOB(
                job_name => l_job_name,
                force    => FALSE
            );

            UPDATE PDROP_BG_RUNS
            SET status = 'STOP_REQUESTED'
            WHERE run_id = p_run_id;

            COMMIT;
        EXCEPTION
            WHEN OTHERS THEN
                l_errmsg := SQLERRM;

                UPDATE PDROP_BG_RUNS
                SET status = 'STOP_FAILED',
                    notes  = SUBSTR(l_errmsg, 1, 4000)
                WHERE run_id = p_run_id;

                COMMIT;
                RAISE;
        END;
    END stop_scheduler_job;

END PDROP_BG_PKG;
/