Интернет-журнал «FORS» Архив номеров На Главную

Конвейерные табличные функции в Oracle

Oracle Pipelined Table Functions

 

Источник: ORACLE-BASE, 2013,
http://www.oracle-base.com/articles/misc/pipelined-table-functions.php

Табличные функции
(Table Functions)

Табличные функции используются для возврата PL/SQL-коллекций, которые имитируют таблицы. Они могут быть запрошены как обычные таблицы с помощью функцию TABLE во фразе FROM. Обычные табличные функции требуют, чтобы коллекции перед возвращением были полностью наполнены (населены). Так как коллекции хранятся в памяти, это может стать проблемой, поскольку на большие коллекции впустую тратится много памяти и времени в ожидании возвращения первой строки. Эти узкие возможности делают обычные табличные функции непригодными в случаях масштабных ETL-операций (ETL — Extraction Transformation Load — Извлечение-Преобразование-Загрузка). Обычные табличные функции требуют создания именованной строки и табличных типов как объектов базы данных.

-- Создание типов для функции.
DROP TYPE t_tf_tab;
DROP TYPE t_tf_row;

CREATE TYPE t_tf_row AS OBJECT (
  id           NUMBER,
  description  VARCHAR2(50)
);
/

CREATE TYPE t_tf_tab IS TABLE OF t_tf_row;
/

-- Build the table function itself.
CREATE OR REPLACE FUNCTION get_tab_tf (p_rows IN NUMBER) RETURN t_tf_tab AS
  l_tab  t_tf_tab := t_tf_tab();
BEGIN
  FOR i IN 1 .. p_rows LOOP
    l_tab.extend;
    l_tab(l_tab.last) := t_tf_row(i, 'Description for ' || i);
  END LOOP;

  RETURN l_tab;
END;
/

-- Тестирование функции.
SELECT *
FROM   TABLE(get_tab_tf(10))
ORDER BY id DESC;

        ID DESCRIPTION
---------- --------------------------------------------------
        10 Description for 10
         9 Description for 9
         8 Description for 8
         7 Description for 7
         6 Description for 6
         5 Description for 5
         4 Description for 4
         3 Description for 3
         2 Description for 2
         1 Description for 1

10 rows selected.

SQL>

Заметим, что в этом листинге строки перечислены в обратном порядке, поскольку запрос содержит фразу упорядоченности по убыванию.

Конвейерные табличные функции)
(Pipelined Table Functions

Конвейерная обработка отменяет надобность в создании огромных наборов, передавая строки по каналу из функции по мере их создания, сохраняя память и позволяя запустить последующую обработку еще до окончания генерации всех строк.

Конвейерные табличные функции включают фразу PIPELINED и используют вызов PIPE ROW, чтобы вытолкнуть строки из функции, как только они создадутся, вместо построения табличной коллекции. Заметим, что вызов RETURN пустой, поскольку нет никакой коллекции, возвращаемой из функции.

-- Построение конвейерной табличной функции.
CREATE OR REPLACE FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_tf_tab PIPELINED AS
BEGIN
  FOR i IN 1 .. p_rows LOOP
    PIPE ROW(t_tf_row(i, 'Description for ' || i));   
  END LOOP;

  RETURN;
END;
/

-- Тестирование 
SELECT *
FROM   TABLE(get_tab_ptf(10))
ORDER BY id DESC;

        ID DESCRIPTION
---------- --------------------------------------------------
        10 Description for 10
         9 Description for 9
         8 Description for 8
         7 Description for 7
         6 Description for 6
         5 Description for 5
         4 Description for 4
         3 Description for 3
         2 Description for 2
         1 Description for 1

10 rows selected.

SQL>

Когда ETL-операции проводятся на большом хранилище данных, наблюдается существенное повышение производительности, поскольку загрузка данных из внешних таблиц производится табличными функциями непосредственно в таблицы хранилища, избегая промежуточного размещения данных.

Исключение NO_DATA_NEEDED
(NO_DATA_NEEDED Exception)

Конвейерная табличная функция может создать больше данных, чем необходимо запросившему её процессу. Когда такое происходит, конвейерная табличная функция останавливает выполнение, порождая исключение NO_DATA_NEEDED. Оно не должно явно обрабатываться, если только в процедуру не включен обработчик исключений OTHERS.

Приведенная ниже функция возвращает 10 строк, но запрос потребовал от нее только первые 5 строк. В этом случае функция прекращает выполнение, вызывая исключение NO_DATA_NEEDED.

-- Построение конвейерной табличной функции.
CREATE OR REPLACE FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_tf_tab PIPELINED AS
BEGIN
  FOR i IN 1 .. p_rows LOOP
    DBMS_OUTPUT.put_line('Row: ' || i);
    PIPE ROW(t_tf_row(i, 'Description for ' || i));
  END LOOP;

  RETURN;
END;
/

-- Тестирование 
SET SERVEROUTPUT ON

SELECT *
FROM   TABLE(get_tab_ptf(10))
WHERE  rownum <= 5;

        ID DESCRIPTION
---------- --------------------------------------------------
         1 Description for 1
         2 Description for 2
         3 Description for 3
         4 Description for 4
         5 Description for 5

5 rows selected.

Row: 1
Row: 2
Row: 3
Row: 4
Row: 5
SQL>

Если имеется обработчик исключений OTHERS, то он захватит исключение NO_DATA_NEEDED и выполнит некоторый код обработки ошибок, что не нужно.

-- Построение конвейерной табличной функции.
CREATE OR REPLACE FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_tf_tab PIPELINED AS
BEGIN
  FOR i IN 1 .. p_rows LOOP
    DBMS_OUTPUT.put_line('Row: ' || i);
    PIPE ROW(t_tf_row(i, 'Description for ' || i));
  END LOOP;

  RETURN;
EXCEPTION
  WHEN OTHERS THEN
    DBMS_OUTPUT.put_line('OTHERS Handler');
    RAISE;
END;
/

-- Тестирование
SET SERVEROUTPUT ON

SELECT *
FROM   TABLE(get_tab_ptf(10))
WHERE  rownum <= 5;

        ID DESCRIPTION
---------- --------------------------------------------------
         1 Description for 1
         2 Description for 2
         3 Description for 3
         4 Description for 4
         5 Description for 5

5 rows selected.

Row: 1
Row: 2
Row: 3
Row: 4
Row: 5
OTHERS Handler
SQL>

Если вы планируете использовать обработчик исключений OTHERS, то для исключения NO_DATA_NEEDED необходимо задействовать специальное прерывание.

-- Построение конвейерной табличной функции.
CREATE OR REPLACE FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_tf_tab PIPELINED AS
BEGIN
  FOR i IN 1 .. p_rows LOOP
    DBMS_OUTPUT.put_line('Row: ' || i);
    PIPE ROW(t_tf_row(i, 'Description for ' || i));
  END LOOP;

  RETURN;
EXCEPTION
  WHEN NO_DATA_NEEDED THEN
    RAISE;
  WHEN OTHERS THEN
    DBMS_OUTPUT.put_line('OTHERS Handler');
    RAISE;
END;
/

-- Тестирование
SET SERVEROUTPUT ON

SELECT *
FROM   TABLE(get_tab_ptf(10))
WHERE  rownum <= 5;

        ID DESCRIPTION
---------- --------------------------------------------------
         1 Description for 1
         2 Description for 2
         3 Description for 3
         4 Description for 4
         5 Description for 5

5 rows selected.

Row: 1
Row: 2
Row: 3
Row: 4
Row: 5
SQL>

Исключение NO_DATA_NEEDED может быть также использовано для выполнения операций очистки (cleanup).

CREATE OR REPLACE FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_tf_tab PIPELINED AS
BEGIN
  my_package.initialize;

  FOR i IN 1 .. p_rows LOOP
    PIPE ROW(t_tf_row(i, 'Description for ' || i));
  END LOOP;

  RETURN;
EXCEPTION
  WHEN NO_DATA_NEEDED THEN
    my_package.cleanup;
    RAISE;
  WHEN OTHERS THEN
    DBMS_OUTPUT.put_line('OTHERS Handler');
    RAISE;
END;
/

Сравнение использования памяти
(Memory Usage Comparison)

Следующая функция возвращает текущее значение определенной статистики. Она позволит нам сравнивать память, используемую обычными и конвейерными табличными функциями.

CREATE OR REPLACE FUNCTION get_stat (p_stat IN VARCHAR2) RETURN NUMBER AS
  l_return  NUMBER;
BEGIN
  SELECT ms.value
  INTO   l_return
  FROM   v$mystat ms,
         v$statname sn
  WHERE  ms.statistic# = sn.statistic#
  AND    sn.name = p_stat;
  RETURN l_return;
END get_stat;
/

Сначала мы протестируем обычную табличную функцию, создав новое соединение и запросив большую коллекцию. Проверяя выделение памяти PGA как до, так и после, тест позволит нам увидеть, сколько памяти было выделено в результате проведения теста.

-- Создание новой сессии.
CONN test/test

-- Тестирование табличной функции.
SET SERVEROUTPUT ON
DECLARE
  l_start  NUMBER;
BEGIN
  l_start := get_stat('session pga memory');

  FOR cur_rec IN (SELECT *
                  FROM   TABLE(get_tab_tf(100000)))
  LOOP
    NULL;
  END LOOP;

  DBMS_OUTPUT.put_line('Regular table function : ' ||
                        (get_stat('session pga memory') - l_start));
END;
/
Regular table function : 22872064
[Обычная табличная функция: 22872064 ]

PL/SQL procedure successfully completed.

SQL>

Затем мы повторим тест для конвейерной табличной функции.

-- Создание новой сессии.
CONN test/test

-- Test pipelined table function.
SET SERVEROUTPUT ON
DECLARE
  l_start  NUMBER;
BEGIN
  l_start := get_stat('session pga memory');

  FOR cur_rec IN (SELECT *
                  FROM   TABLE(get_tab_ptf(100000)))
  LOOP
    NULL;
  END LOOP;

  DBMS_OUTPUT.put_line('Pipelined table function : ' ||
                        (get_stat('session pga memory') - l_start));
END;
/
Pipelined table function : 65536
[Конвейерная табличная функция: 65536 ]

PL/SQL procedure successfully completed.

SQL>

Сокращение памяти, используемой конвейерной табличной функцией, обусловлено тем, что она не требует разместить целую коллекцию в памяти.

Кардинальность
(Cardinality)

Oracle оценивает кардинальность (мощность, количество элементов) конвейерной табличной функции, базируясь на размере блока базы данных. Когда используется размер блока по умолчанию, оптимизатор всегда предполагает, что кардинальность - 8168 строк.

SET AUTOTRACE TRACE EXPLAIN

-- Возвращение 10 строк.
SELECT *
FROM   TABLE(get_tab_ptf(10));

Execution Plan
----------------------------------------------------------
Plan hash value: 822655197

-------------------------------------------------------------------------------------------------
| Id  | Operation                         | Name        | Rows  | Bytes | Cost (%CPU)| Time     |
-------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT                  |             |  8168 | 16336 |     8   (0)| 00:02:19 |
|   1 |  COLLECTION ITERATOR PICKLER FETCH| GET_TAB_PTF |  8168 | 16336 |     8   (0)| 00:02:19 |
-------------------------------------------------------------------------------------------------

SET AUTOTRACE OFF

Это прекрасно, если вы запрашиваете только конвейерную табличную функцию, но если планируется использовать ее в соединении, это может оказать негативное влияние на план выполнения.

Есть 4 способа, чтобы исправить оценку кардинальности конвейерной табличной функции:

Чтобы использовать расширяемый оптимизатор, в конвейерные табличные функции нужно вручную добавить параметр, который укажет оптимизатору использовать кардинальность.

CREATE OR REPLACE FUNCTION get_tab_ptf (p_cardinality IN INTEGER DEFAULT 1)
  RETURN t_tf_tab PIPELINED AS
BEGIN
  FOR i IN 1 .. 10 LOOP
    PIPE ROW (t_tf_row(i, 'Description for ' || i));
  END LOOP;

  RETURN;
END;
/

Заметим, что параметр p_cardinality нигде не используется непосредственно в функции.

Затем мы строим тип и тело типа, чтобы установить кардинальность (количество элементов) вручную. Обратите внимание на ссылку на параметр p_cardinality в типе.

CREATE OR REPLACE TYPE t_ptf_stats AS OBJECT (
  dummy INTEGER,
  
  STATIC FUNCTION ODCIGetInterfaces (
    p_interfaces OUT SYS.ODCIObjectList
  ) RETURN NUMBER,

  STATIC FUNCTION ODCIStatsTableFunction (
    p_function    IN  SYS.ODCIFuncInfo,
    p_stats       OUT SYS.ODCITabFuncStats,
    p_args        IN  SYS.ODCIArgDescList,
    p_cardinality IN INTEGER
  ) RETURN NUMBER
);
/

CREATE OR REPLACE TYPE BODY t_ptf_stats AS
  STATIC FUNCTION ODCIGetInterfaces (
    p_interfaces OUT SYS.ODCIObjectList
  ) RETURN NUMBER IS
  BEGIN
    p_interfaces := SYS.ODCIObjectList(
                      SYS.ODCIObject ('SYS', 'ODCISTATS2')
                    );
    RETURN ODCIConst.success;
  END ODCIGetInterfaces;

  STATIC FUNCTION ODCIStatsTableFunction (
                    p_function    IN  SYS.ODCIFuncInfo,
                    p_stats       OUT SYS.ODCITabFuncStats,
                    p_args        IN  SYS.ODCIArgDescList,
                    p_cardinality IN INTEGER
                  ) RETURN NUMBER IS
  BEGIN
    p_stats := SYS.ODCITabFuncStats(NULL);
    p_stats.num_rows := p_cardinality;
    RETURN ODCIConst.success;
  END ODCIStatsTableFunction;
END;
/

Этот тип может быть связан с любой конвейерной табличной функцией, используя следующую команду.

ASSOCIATE STATISTICS WITH FUNCTIONS get_tab_ptf USING t_ptf_stats;

Мы знаем, что функция возвращает 10 строк, но оптимизатор этого не знает. Независимо от числа строк, возвращенных функцией, оптимизатор использует значение параметра p_cardinality как оценку количества элементов (кардинальности).

SET AUTOTRACE TRACE EXPLAIN

SELECT *
FROM   TABLE(get_tab_ptf(p_cardinality => 10));

Execution Plan
----------------------------------------------------------
Plan hash value: 822655197

-------------------------------------------------------------------------------------------------
| Id  | Operation                         | Name        | Rows  | Bytes | Cost (%CPU)| Time     |
-------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT                  |             |    10 |    20 |     8   (0)| 00:02:19 |
|   1 |  COLLECTION ITERATOR PICKLER FETCH| GET_TAB_PTF |    10 |    20 |     8   (0)| 00:02:19 |
-------------------------------------------------------------------------------------------------

SELECT *
FROM   TABLE(get_tab_ptf(p_cardinality => 10000));

Execution Plan
----------------------------------------------------------
Plan hash value: 822655197

-------------------------------------------------------------------------------------------------
| Id  | Operation                         | Name        | Rows  | Bytes | Cost (%CPU)| Time     |
-------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT                  |             | 10000 | 20000 |     8   (0)| 00:02:19 |
|   1 |  COLLECTION ITERATOR PICKLER FETCH| GET_TAB_PTF | 10000 | 20000 |     8   (0)| 00:02:19 |
-------------------------------------------------------------------------------------------------

SET AUTOTRACE OFF

Неявные (теневые) типы
(Implicit (Shadow) Types)

В отличие от обычных табличных функций, конвейерные табличные функции могут быть определены с использованием типов "table" и "record", определенных в спецификации пакета.

-- Удаление ранее созданных объектов.
DROP FUNCTION get_tab_tf;
DROP FUNCTION get_tab_ptf;
DROP TYPE t_tf_tab;
DROP TYPE t_tf_row;

-- Построение пакета, содержащего внутренние типы "record" и "table".
CREATE OR REPLACE PACKAGE ptf_api AS
  TYPE t_ptf_row IS RECORD (
    id           NUMBER,
    description  VARCHAR2(50)
  );

  TYPE t_ptf_tab IS TABLE OF t_ptf_row;

  FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_ptf_tab PIPELINED;
END;
/

CREATE OR REPLACE PACKAGE BODY ptf_api AS

  FUNCTION get_tab_ptf (p_rows IN NUMBER) RETURN t_ptf_tab PIPELINED IS
    l_row  t_ptf_row;
  BEGIN
    FOR i IN 1 .. p_rows LOOP
      l_row.id := i;
      l_row.description := 'Description for ' || i;
      PIPE ROW (l_row);
    END LOOP;
  
    RETURN;
  END;
END;
/

SELECT *
FROM   TABLE(ptf_api.get_tab_ptf(10))
ORDER BY id DESC;

        ID DESCRIPTION
---------- --------------------------------------------------
        10 Description for 10
         9 Description for 9
         8 Description for 8
         7 Description for 7
         6 Description for 6
         5 Description for 5
         4 Description for 4
         3 Description for 3
         2 Description for 2
         1 Description for 1

10 rows selected.

SQL>

Это представляется более правильным решением, чем построение всех типов базы данных вручную, а Oracle по умолчанию строит теневые объектные типы неявно.

COLUMN object_name FORMAT A30

SELECT object_name, object_type
FROM   user_objects;

OBJECT_NAME                    OBJECT_TYPE
------------------------------ -------------------
PTF_API                        PACKAGE BODY
SYS_PLSQL_82554_9_1            TYPE
SYS_PLSQL_82554_DUMMY_1        TYPE
SYS_PLSQL_82554_24_1           TYPE
PTF_API                        PACKAGE

5 rows selected.

SQL>

Как можно видеть, Oracle фактически создал три теневых объектных типа с системно сгенерированными именами для поддержки типов, требуемых конвейерной табличной функцией. По этой причине я всегда строю именованные объектные типы базы данных, вместо того, чтобы полагаться на неявные типы.

Конвейерные табличные функции, запускаемые параллельно
(Parallel Enabled Pipelined Table Functions)

Чтобы включить параллельные конвейерные табличные функции, должны быть выполнены следующие условия.

Основной синтаксис показан ниже.

CREATE FUNCTION function-name(parameter-name ref-cursor-type)
  RETURN rec_tab_type PIPELINED
  PARALLEL_ENABLE(PARTITION parameter-name BY [{HASH | RANGE} (column-list) | ANY ]) IS
BEGIN
  ...
END;

Чтобы увидеть это в действии, сначала надо создать и населить тестовую таблицу.

CREATE TABLE parallel_test (
  id           NUMBER(10),
  country_code VARCHAR2(5),
  description  VARCHAR2(50)
);

INSERT /*+ APPEND */ INTO parallel_test
SELECT level AS id,
       (CASE TRUNC(MOD(level, 4))
         WHEN 1 THEN 'IN'
         WHEN 2 THEN 'UK'
         ELSE 'US'
        END) AS country_code,
       'Description or ' || level AS description
FROM   dual
CONNECT BY level <= 100000;
COMMIT;

-- Проверка данных.
SELECT country_code, count(*) FROM parallel_test GROUP BY country_code;

COUNT   COUNT(*)
----- ----------
US         50000
IN         25000
UK         25000

3 rows selected.

SQL>

Следующий пакет определяет включенные параллельные конвейерные табличные функции, которые принимают ref-курсоры по запросу из тестовой таблицы и возвращают те же самые строки вместе с SID (системный идентификатор) сессии, которая их обработала. Можно было бы использовать слабо связанный ref-курсор, подобный SYS_REFCURSOR, но тогда мы были бы ограничены только методом секционирования SYS_REFCURSOR. Следующие три функции представляют три метода секционирования.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

  TYPE t_parallel_test_row IS RECORD (
    id             NUMBER(10),
    country_code   VARCHAR2(5),
    description    VARCHAR2(50),
    sid            NUMBER
  );

  TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

  TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN parallel_test%ROWTYPE;
  
  FUNCTION test_ptf_any (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY);
    
  FUNCTION test_ptf_hash (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY HASH (country_code));
    
  FUNCTION test_ptf_range (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY RANGE (country_code));
    
END parallel_ptf_api;
/

CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

  FUNCTION test_ptf_any (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY)
  IS
    l_row  t_parallel_test_row;
  BEGIN
    LOOP
      FETCH p_cursor
      INTO  l_row.id,
            l_row.country_code,
            l_row.description;
      EXIT WHEN p_cursor%NOTFOUND;
      
      SELECT sid
      INTO   l_row.sid
      FROM   v$mystat
      WHERE  rownum = 1;
      
      PIPE ROW (l_row);
    END LOOP;
    RETURN;
  END test_ptf_any;

  FUNCTION test_ptf_hash (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY HASH (country_code))
  IS
    l_row  t_parallel_test_row;
  BEGIN
    LOOP
      FETCH p_cursor
      INTO  l_row.id,
            l_row.country_code,
            l_row.description;
      EXIT WHEN p_cursor%NOTFOUND;
      
      SELECT sid
      INTO   l_row.sid
      FROM   v$mystat
      WHERE  rownum = 1;
      
      PIPE ROW (l_row);
    END LOOP;
    RETURN;
  END test_ptf_hash;

  FUNCTION test_ptf_range (p_cursor  IN  t_parallel_test_ref_cursor)
    RETURN t_parallel_test_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY RANGE (country_code))
  IS
    l_row  t_parallel_test_row;
  BEGIN
    LOOP
      FETCH p_cursor
      INTO  l_row.id,
            l_row.country_code,
            l_row.description;
      EXIT WHEN p_cursor%NOTFOUND;
      
      SELECT sid
      INTO   l_row.sid
      FROM   v$mystat
      WHERE  rownum = 1;
      
      PIPE ROW (l_row);
    END LOOP;
    RETURN;
  END test_ptf_range;
      
END parallel_ptf_api;
/

Следующий запрос использует функцию CURSOR, чтобы преобразовать запрос к тестовой таблице в ref-курсор, который передан табличной функции в качестве параметра. Результаты группируются по SID сессий, которые обрабатывают строки. Отметим, что все строки обработаны одной и той же сессией. Почему? Потому что, хотя эта функция включена как параллельная, мы не указали ей работать параллельно.

SELECT sid, count(*)
FROM   TABLE(parallel_ptf_api.test_ptf_any(CURSOR(SELECT * FROM parallel_test t1))) t2
GROUP BY sid;

       SID   COUNT(*)
---------- ----------
        31     100000

1 row selected.

SQL>

Следующие запросы включают хинт параллельности и вызывают все эти функции.

SELECT country_code, sid, count(*)
FROM   TABLE(parallel_ptf_api.test_ptf_any(CURSOR
        (SELECT /*+ parallel(t1, 5) */  * FROM   parallel_test t1))) t2
GROUP BY country_code,sid
ORDER BY country_code,sid;

COUNT        SID   COUNT(*)
----- ---------- ----------
IN            23       4906
IN            26       5219
IN            41       4847
IN            42       4827
IN            43       5201
UK            23       4906
UK            26       5218
UK            41       4848
UK            42       4826
UK            43       5202
US            23       9811
US            26      10437
US            41       9695
US            42       9655
US            43      10402

15 rows selected.

SQL>

SELECT country_code, sid, count(*)
FROM   TABLE(parallel_ptf_api.test_ptf_hash(CURSOR
        (SELECT /*+ parallel(t1, 5) */  * FROM   parallel_test t1))) t2
GROUP BY country_code,sid
ORDER BY country_code,sid;

COUNT        SID   COUNT(*)
----- ---------- ----------
IN            29      25000
UK            38      25000
US            40      50000

3 rows selected.

SQL>

SELECT country_code, sid, count(*)
FROM   TABLE(parallel_ptf_api.test_ptf_range(CURSOR
        (SELECT /*+ parallel(t1, 5) */ * FROM   parallel_test t1))) t2
GROUP BY country_code,sid
ORDER BY country_code,sid;

COUNT        SID   COUNT(*)
----- ---------- ----------
IN            40      25000
UK            23      25000
US            41      50000

3 rows selected.

SQL>

Уровень параллелизма (DOP — degree of parallelism) может быть ниже чем тот, который указан в хинте.

Дополнительная фраза управления выходным потоком быть использована для упорядочения или кластеризации (объединение в группы) — order or cluster — данных, основанной на списке столбцов, в процессе серверной обработки. Это может быть необходимым, если существуют зависимости в данных. Например, нужно секционировать по определенному столбцу, но также и требовать, чтобы строки были обработаны в определенном порядке в рамках этого же секционирования. Расширенный синтаксис такого случая показан ниже.

CREATE FUNCTION function-name(parameter-name ref-cursor-type)
  RETURN rec_tab_type PIPELINED
  PARALLEL_ENABLE(PARTITION parameter-name BY [{HASH | RANGE} (column-list) | ANY ]) 
  [ORDER | CLUSTER] parameter-name BY (column-list) IS
BEGIN
  ...
END;

Можно сделать нечто подобное:

FUNCTION test_ptf_hash (p_cursor  IN  t_parallel_test_ref_cursor)
  RETURN t_parallel_test_tab PIPELINED
  PARALLEL_ENABLE(PARTITION p_cursor BY HASH (country_code))
  ORDER p_cursor BY (country_code, created_date);

FUNCTION test_ptf_hash (p_cursor  IN  t_parallel_test_ref_cursor)
  RETURN t_parallel_test_tab PIPELINED
  PARALLEL_ENABLE(PARTITION p_cursor BY HASH (country_code))
  CLUSTER p_cursor BY (country_code, created_date);

Трансформация конвейеров
(Transformation Pipelines)

В традиционных ETL-процессах необходимо сначала загрузить данные в промежуточную область, затем сделать по ней несколько проходов, чтобы преобразовать и переместить данные в область, откуда они будут загружены в схему назначения. Прохождение данных через промежуточные таблицы может потребовать значительного количества операций дискового ввода/вывода, как для загружаемых данных, так и для данных redo-журнала. Альтернативой должно стать выполнение преобразования конвейерными табличными функциями, поскольку данные читаются из внешней таблицы и вставляются непосредственно в таблицу назначения, сокращая большую часть операций дискового ввода/вывода.

В этой секции мы увидим и проэкзаменуем с использованием обсуждавшихся ранее методов трансформацию конвейера.

Сначала в виде плоского файла нужно выкачать из файловой системы сервера базы данных какие-либо тестовые данные.

SET PAGESIZE 0
SET FEEDBACK OFF
SET LINESIZE 1000
SET TRIMSPOOL ON
SPOOL /tmp/tp_test.txt
SELECT owner || ',' || object_name || ',' || object_type || ',' || status
FROM   all_objects;
SPOOL OFF
SET FEEDBACK ON
SET PAGESIZE 24

Создаем объект "directory", где указывается местоположение этого файла, создаем внешнюю таблицу, чтобы прочитать файл, и создаем таблицу назначения.

-- Создание директории, указывающей на плоский файл. 
CONN / AS SYSDBA
CREATE OR REPLACE DIRECTORY data_load_dir AS '/tmp/';
GRANT READ, WRITE ON DIRECTORY data_load_dir TO test;

CONN test/test
-- Создание внешней таблицы.
DROP TABLE tp_test_ext;
CREATE TABLE tp_test_ext (
  owner                    VARCHAR2(30),
  object_name              VARCHAR2(30),
  object_type              VARCHAR2(19),
  status                   VARCHAR2(7)
)
ORGANIZATION EXTERNAL
(
  TYPE ORACLE_LOADER
  DEFAULT DIRECTORY data_load_dir
  ACCESS PARAMETERS
  (
    RECORDS DELIMITED BY NEWLINE
    BADFILE data_load_dir:'tp_test_%a_%p.bad'
    LOGFILE data_load_dir:'tp_test_%a_%p.log'
    FIELDS TERMINATED BY ','
    MISSING FIELD VALUES ARE NULL
    (
      owner                    CHAR(30),
      object_name              CHAR(30),
      object_type              CHAR(19),
      status                   CHAR(7)
    )
  )
  LOCATION ('tp_test.txt')
)
PARALLEL 10
REJECT LIMIT UNLIMITED
/

-- Создание таблицы как заключительного назначения для данных. 
CREATE TABLE tp_test (
  owner                    VARCHAR2(30),
  object_name              VARCHAR2(30),
  object_type              VARCHAR2(19),
  status                   VARCHAR2(7),
  extra_1                  NUMBER,
  extra_2                  NUMBER
);

Заметим, что в таблице назначения по сравнению с внешней таблицей есть два дополнительных столбца. Каждый из этих столбцов представляет шаг преобразования. Фактические преобразования в этом примере тривиальны, но следует представить, что они могут быть сложными и невыполнимыми одним SQL-предложением. Следовательно, имеет место потребность в табличных функциях.

Пакет ниже определяет два шага процесса преобразования и процедуры для его запуска.

CREATE OR REPLACE PACKAGE tp_api AS

  TYPE t_step_1_in_rc IS REF CURSOR RETURN tp_test_ext%ROWTYPE;
  
  TYPE t_step_1_out_row IS RECORD (
    owner                    VARCHAR2(30),
    object_name              VARCHAR2(30),
    object_type              VARCHAR2(19),
    status                   VARCHAR2(7),
    extra_1                  NUMBER
  );
  
  TYPE t_step_1_out_tab IS TABLE OF t_step_1_out_row;

  TYPE t_step_2_in_rc IS REF CURSOR RETURN t_step_1_out_row;

  TYPE t_step_2_out_tab IS TABLE OF tp_test%ROWTYPE;

  FUNCTION step_1 (p_cursor  IN  t_step_1_in_rc)
    RETURN t_step_1_out_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY);

  FUNCTION step_2 (p_cursor  IN  t_step_2_in_rc)
    RETURN t_step_2_out_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY);

  PROCEDURE load_data;

END tp_api;
/


CREATE OR REPLACE PACKAGE BODY tp_api AS

  FUNCTION step_1 (p_cursor  IN  t_step_1_in_rc)
    RETURN t_step_1_out_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY)
  IS
    l_row  t_step_1_out_row;
  BEGIN
    LOOP
      FETCH p_cursor
      INTO  l_row.owner,
            l_row.object_name,
            l_row.object_type,
            l_row.status;
      EXIT WHEN p_cursor%NOTFOUND;
      
      -- Do some work here.
      l_row.extra_1 := p_cursor%ROWCOUNT;
      PIPE ROW (l_row);
    END LOOP;
    RETURN;
  END step_1;


  FUNCTION step_2 (p_cursor  IN  t_step_2_in_rc)
    RETURN t_step_2_out_tab PIPELINED
    PARALLEL_ENABLE(PARTITION p_cursor BY ANY)
  IS
    l_row  tp_test%ROWTYPE;
  BEGIN
    LOOP
      FETCH p_cursor
      INTO  l_row.owner,
            l_row.object_name,
            l_row.object_type,
            l_row.status,
            l_row.extra_1;
      EXIT WHEN p_cursor%NOTFOUND;
      
      -- Do some work here.
      l_row.extra_2 := p_cursor%ROWCOUNT;
      PIPE ROW (l_row);
    END LOOP;
    RETURN;
  END step_2;


  PROCEDURE load_data IS
  BEGIN
    EXECUTE IMMEDIATE 'TRUNCATE TABLE tp_test';
    
    INSERT /*+ APPEND */ INTO tp_test
    SELECT *
    FROM   TABLE(step_2(CURSOR(SELECT *
                               FROM   TABLE(step_1(CURSOR(SELECT *
                                                          FROM   tp_test_ext t1
                                                          )
                                                   )
                                            ) t2
                               )
                        )
                 ) t3;
    COMMIT;
  END load_data;

END tp_api;
/

Вставка внутри процедуры LOAD_DATA полностью выполняет загрузку данных, включая преобразования. Предложение выглядит довольно сложно, но оно состоит из следующих простых шагов.

Применяя процедуру LOAD_DATA, можно как преобразовывать, так и загружать данные.

EXEC tp_api.load_data;

PL/SQL procedure successfully completed.

SQL>

-- Проверка числа строк во внешней таблице.
SELECT COUNT(*) FROM tp_test_ext;

  COUNT(*)
----------
     56059

1 row selected.

SQL> 

-- Compare to the destination table.Сравнение с таблицей назначения.
SELECT COUNT(*) FROM tp_test;

  COUNT(*)
----------
     56059

1 row selected.

SQL>

Заметим, что этот пример не содержит процедуры обработки ошибок и что в нем нет хинтов параллельности, чтобы упростить запрос в процедуре LOAD_DATA.

Для получения дополнительной информации обратитесь к документации:

Надеюсь, это вам поможет. С приветом Tim...