Monday, September 14, 2009

Merge Rows as Columns / Transpose records


Requirement: Converting rows to columns


Customer
Product
Cost
Cust1
P1
10
Cust1
P2
20
Cust1
P3
30
Cust2
ABC
10
Cust2
P2
25
Cust2
Def
10
Customer
Product1
Cost1
Product2
Cost2
Product3
Cost3
Cust1
P1
10
P2
20
P3
30
Cust2
ABC
10
P2
25
Def
10

The above illustration would help in understanding the requirement. We had to merge multiple records into one record based on certain criteria. The design had to be reusable since each dimension within the data mart required this flattening logic.

1. Approach:
The use of aggregator transformation would group the records by a key, but retrieval of the values for a particular column as individual columns is a challenge, hence designed a component ‘Flattener’ based on expression transformation.
Flattener is a reusable component, a mapplet that performs the function of flattening records.
Flattener consists of an Expression and a Filter transformation. The expression is used to club each incoming record based on certain logic. Decision to write the record to target is taken using the Filter transformation.

2. Design:
The mapplet can receive up to five inputs, of the following data types:
i_Col1 (string),  Customer
i_Col2 (string), Product
i_Col3 (decimal), Cost
i_Col4 (decimal) and
i_Col5 (date/time)
Have kept the names generic trying to accept different data types, so that the mapplet can be used in any scenario where there is a need for flattening records.
The mapplet gives out 15×5 sets of output, in the following manner:
o_F1_1 (string), Customer
o_F2_1 (string), Product1
o_F3_1 (decimal), Cost1
o_F4_1 (decimal) and
o_F5_1 (date/time)
o_F1_2 (string), Customer
o_F2_2 (string), Product2
o_F3_2 (decimal), Cost2
o_F4_2 (decimal) and
o_F5_2 (date/time)
… … and so on
The output record is going to have repetitive sets of 5 columns each (Each set would refer to one incoming row). Based on the requirement the number of occurrence of these sets can be increased. The required fields alone can be used / mapped. For the above example we use just 2 strings and one decimal for mapping Customer, Product and Cost.
The mapplet receives records from its parent mapping. The Expression would initially save each incoming value to a variable and compare it with its counterpart that came in earlier and is held in its cache as long as the condition to flatten is satisfied.
Syntax to store current and previous values:
i_Col2 string i
prv_Col2 string v curr_Col2
curr_Col2 string v i_Col2
The condition/logic to flatten records is parameterized and decided before mapping is called thus increasing codes’ scalability. The parameterized logic is passed to the Expression transformation via a Mapplet parameter. The value is used as an expression to perform the evaluation and the result is a flag value either ‘1’ or ‘2’.
Syntax for port – flag
Flag integer v $$Expr_compare
An example for parameterized expression
$$Expr_compare = iif (curr_Col1 = prv_Col1 AND curr_Col2 !=
prv_Col2, 1, iif (curr_Col1 != prv_Col1,2))
A variable port named “rec_count” is incremented, based on the flag.
Syntax for port – rec_count
rec_count integer v iif (flag=2,0, iif (flag=1,rec_count + 1,rec_count))
The expression transformation now uses the value in ports “flag” and “rec_count” to decide the place holder for each incoming input value, i.e. the column in target table where this data would move into ultimately. This process is an iterative one and goes on until the comparison logic ($$Expr_compare) holds good, i.e. until all records get flattened per the logic. An example of the place holder expression is shown below:
v_Field1 data type v iif(flag=2 AND rec_count=0,curr_Col1, v_Field1)
Port “write_flag_1” is set to 1 when the comparison logic fails (meaning flattening is complete). Filter transformation filters out the record once it is completely transposed.
Filter condition:
write_flag_1 integer v iif (flag=2 AND write_flag>1 ,1,0)

3. Outcome:
After developing the code and implementing the same we found it to be a useful utility, so thought of sharing it and would like to hear suggestions from readers on performing the same functionality in a different way. Please do share your views.

Wednesday, September 2, 2009

Process Control / Audit of Workflows in Informatica


1. Process Control – Definition
Process control or Auditing of a workflow in an Informatica is capturing the job information like start time, end time, read count, insert count, update count and delete count. This information is captured and written into table as the workflow executes

2. Structure of Process Control/Audit table
The table structure of process control table is given below,
Table 1: Process Control structure
PROCESS_RUN_IDNumber(p,s)11A unique number used to identify a specific process run.
PROCESS_NMEVarchar2120The name of the process (this column will be populated with the names of the informatica mappings.)
START_TMSTDate19The date/time when the process started.
END_TMSTDate19The date/time when the process ended.
ROW_READ_CNTNumber(p,s)16The number of rows read by the process.
ROW_INSERT_CNTNumber(p,s)16The number of rows inserted by the process.
ROW_UPDATE_CNTNumber(p,s)16The number of rows updated by the process.
ROW_DELETE_CNTNumber(p,s)16The number of rows deleted by the process
ROW_REJECT_CNTNumber(p,s)16The number of rows rejected by the process.
USER_IDVarchar232The etl user identifier associated with the process.
3.  Mapping Logic and Build Steps
The process control flow has two data flows, one is an insert flow and the other is an update flow. The insert flow runs before the main mapping and update flows runs after the main mapping, this option is chosen in “Target Load Plan”. The source for both the flows could be a dummy source which will return one record as output, for example select ‘process’ from dual or select count(1) from Table_A. The following list of mapping variable is to be created,
Table 2: Mapping Parameter and variables
$$PROCESS_ID
$$PROCESS_NAME
$$INSERT_COUNT
$$UPDATE_COUNT
$$DELETE_COUNT
$$REJECT_COUNT
Steps to create Insert flow:
  • 1. Have “select ‘process’ from dual” as Sequel in source qualifier
  • 2. Have a sequence generator to create running process_run_Id ’s
  • 3. In an expression SetVariable ($$PROCESS_RUN_ID,NEXTVAL), $$PROCESS_NAME to o_process_name, a output only field
  • 4. In an expression assign $$SessionStarttime to o_Starttime, an output only field
  • 5. In an expression accept the sequence id from sequence generator
  • 6. Insert into target’ process control table’ with all the above three values
Table 3: Process Control Image after Insert flow

PROCESS_RUN_ID1
PROCESS_NMEVENDOR_DIM_LOAD
START_TMST8/23/2009 12:23
END_TMST
ROW_READ_CNT
ROW_INSERT_CNT
ROW_UPDATE_CNT
ROW_DELETE_CNT
ROW_REJECT_CNT
USER_IDINFA8USER
Steps in main mapping,
  • 1. After the source qualifier, increment the read count in a variable (v_read_count) for each record been read in an expression and SetMaxVariable ($$READ_COUNT,v_read_count)
  • 2. Before the update strategy of target instances, do the same for Insert/Update/Delete counts; all the variables are now set with all their respective counts
Steps to create Update flow:
  • 1. Have “select ‘process’ from dual” as Sequel in source qualifier
  • 2. Use SetMaxvariable to get the process_run_id created in insert flow
  • 3. In an expression assign $$INSERT_COUNT to an o_insert_count, a output only field, assign all the counts in the same way
  • 4. In an expression assign $$SessionEndtime to o_Endtime, an output only field
  • 5. Update the target ‘Process Control Table’ with all the above three values where process_run_id equals the process_run_id generated in Insert flow
Table 4: Process Control Image after Update flow
PROCESS_RUN_ID1
PROCESS_NMEVENDOR_DIM_LOAD
START_TMST8/23/2009 12:23
END_TMST8/23/2009 12:30
ROW_READ_CNT1000
ROW_INSERT_CNT900
ROW_UPDATE_CNT60
ROW_DELETE_CNT40
ROW_REJECT_CNT0
USER_IDINFA8USER

4. Merits over Informatica Metadata
This information is also available in Informatica metadata, however maintaining this within our system has following benefits,
  • Need not write complex query to bring in the data from metadata tables
  • Job names need not be mapping names and can be user friendly names
  • Insert/Delete/Update counts of all as well as individual target can be audited
  • This audit information can be maintained outside the metadata security level and can be used by other mappings in their transformations
  • Can be used by mappings that build parameter files
  • Can be used by mappings that govern data volume
  • Can be used by Production support to find out the quick status of load