Monday, 2 July 2012

ETL Cleansing : Remove Whitespace

I'm working on a green field data warehouse project with it's fair share of legacy data sources. I know the SQL Server based sources are of average quality but I'm expecting quite a lot of CSV files from external entities and I'll have minimal control over their data quality.

None of this is unusual. Therefore my solution is to produce an ETL cleansing mechanism which can accept any data source and use a process control table to iterate through predefined rules. As an example, I've written a sproc which removes white space from the front or back of a string. This is an easy thing to do but my requirement is to create a solution that can work with or without SSIS and which can log all the changes it makes in an audit schema. I also want it to be fast so it has to work set based not RBAR.

An excellent set of tools from Pragmatic Works does the same using SSIS. I'd like the choice of using a T-SQL or SSIS approach so I'm building my own plus my organisation is new to the data warehouse world and budgets are stretched. If you're working for an enterprise class business or an organisation with buy the Pragmatic Works tools.

I'll step through the solution then show then show the code.
  1. The procedure takes a number of parameters. @Database, @Schema, @Table and @Column specify what will be cleansed. @TrimLeadingWhitespace and @TrimTrailingWhitespace parameters are switches, the @PrintOrExecute allows us to debug the Dynamic SQL by calling the procedure and producing but not executing the code. @BatchLogID is optional as we'll generate a BatchLogID is it's not passed.
  2. After setting up the variables required to store the Dynamic SQL and for use in the loop I initiate a BatchLogID which is used in my Audit metadata to track all changes.
  3. I then begin to build a dynamic SQL string which will use the stored procedure parameters to update the required column in the specified table. This can be quite complex hence the Print or Execute flag which is invaluable during development.
  4. Because I'll log what data was changed in a set based operation I want to created a string containing the primary key columns and values for each row altered. To do this over any input parameters I've got another procedure called GetPrimaryKey which given the details of a table will return a comma separated list of the PK columns. I then reconstruct this PK string with the actual values of the row being updated appended into the string.
  5. The follow step appends an OUTPUT statement to the foot of the UPDATE which will INSERT verbose logging information into a log location defined earlier in the sproc.
  6. Finally I construct the Dynamic SQL and either Print or Execute.
To test it simply plug in some AdventureWorks details and set the @PrintOrExecute flag to 'P' like this;

 EXEC Clean.RemoveWhitespace NULL,'AdventureWorks','Person','Address','AddressLine2','Y','Y','P'  

And here is the code. Feel free to use it but if you do please share your experiences and let me  know of any optimisations you make in the comments below.

 ALTER PROCEDURE [Clean].[RemoveWhitespace]  
 (  
      @BatchLogID INT,  
      @Database AS VARCHAR(50),  
      @Schema AS VARCHAR(50),  
      @Table AS VARCHAR(50),  
      @Column AS VARCHAR(50),  
      @TrimLeadingWhitespace AS CHAR(1), -- Options are "Y" or "N"  
      @TrimTrailingWhitespace AS CHAR(1), -- Options are "Y" or "N"  
      @PrintOrExecute AS CHAR(1) -- Options are "P" or "E". Print will just show what the sproc intended to clean, execute will actually do it.  
 )  
 AS  
 SET NOCOUNT ON  
 -- SPROC SPECIFIC VARIABLES  
 DECLARE @SQL VARCHAR(4000)  
 DECLARE @MatchingCode VARCHAR(100)  
 DECLARE @PKID VARCHAR(100)  
 DECLARE @MaxID SMALLINT  
 DECLARE @Counter SMALLINT  
 DECLARE @PKString VARCHAR(500)  
 DECLARE @LogLocation VARCHAR(100)  
 DECLARE @ErrorMessage VARCHAR(255)  
 SET @PKString = ''  
 SET @Counter = 0  
 SELECT @LogLocation = ConfigVariable FROM DataWarehouse.Common.Config WHERE ConfigDescription = 'ETL Cleanse Log Location'  
 -- CHECK FOR BATCHLOGID. IF NOT EXISTS THEN INITIATE BATCHLOG  
 IF ISNULL(@BatchLogID,0) = 0  
 BEGIN  
   INSERT INTO DataWarehouse.Audit.BatchLog(StartDatetime,[Status])  
     VALUES (GETDATE(), 'R')  
   SELECT @BatchLogID = SCOPE_IDENTITY()  
 END  
 -- BEGIN PROCESS  
 BEGIN TRY   
      BEGIN TRANSACTION  
           -- DEFINE THE LIKE CLAUSE FOR USE THROUGHOUT THE SPROC  
           SELECT @MatchingCode =   
           CASE WHEN (@TrimLeadingWhitespace = 'Y' AND @TrimTrailingWhitespace = 'N')     THEN @Column + ' LIKE '' %'''  
                WHEN (@TrimLeadingWhitespace = 'N' AND @TrimTrailingWhitespace = 'Y') THEN @Column + ' LIKE ''% '''  
                ELSE @Column + ' LIKE '' %'' OR ' + @Column + ' LIKE ''% ''' END   
           -- DO THE CLEANSE BY UPDATING THE COLUMN  
                -- BEGIN BUILDING THE DYNAMIC SQL TO PERFORM THE CLEAN                 
                SET @SQL = 'UPDATE ' + @Database + '.' + @Schema + '.' + @Table + ' SET ' + @Column + ' = '  
                -- APPEND TRIM FUNCTIONS REQUIRED  
                SELECT @SQL = @SQL +  
                     CASE WHEN (@TrimLeadingWhitespace = 'Y' AND @TrimTrailingWhitespace = 'N')     THEN 'LTRIM(' + @Column + ')'  
                     WHEN (@TrimLeadingWhitespace = 'N' AND @TrimTrailingWhitespace = 'Y') THEN 'RTRIM(' + @Column + ')'  
                     ELSE 'LTRIM(RTRIM(' + @Column + '))' END   
                -- COLLECT PK INFORMATION FOR OUTPUT COLUMNS  
                     -- GET PK FOR SPECIFIED TABLE  
                     EXEC DataWarehouse.Common.GetPrimaryKey @Database,@Schema,@Table,@PKList = @PKID OUTPUT  
                     -- CREATE TEMP TABLE OF PK COLUMNS WITH IDENTITY  
                     DECLARE @PKTable TABLE (ID INT IDENTITY(1,1), Items VARCHAR(100))  
                     -- POPULATE TABLE VARIABLE  
                     INSERT INTO @PKTable (Items) SELECT Items FROM DataWarehouse.Common.Spliter(@PKID,',')   
                     -- GET MAXIMUM ID VALUE  
                     SELECT @MaxID = MAX(ID) FROM @PKTable  
                     -- BUILD THE STRING INTHE TABLE VARIABLE  
                     UPDATE @PKTable SET Items = Items + ' = # + CAST(INSERTED.' + Items + ' AS VARCHAR(30))'  
                     UPDATE @PKTable SET Items = Items + ' + #' WHERE ID <> @MaxID  
                     UPDATE @PKTable SET Items = '(#' + Items WHERE ID = 1  
                     UPDATE @PKTable SET Items = Items + ',' WHERE ID <> @MaxID  
                     UPDATE @PKTable SET Items = Items + ') AS PrimaryKey' WHERE ID = @MaxID  
                     UPDATE @PKTable SET Items = REPLACE(Items,'#',CHAR(39))  
                     -- CONCATENATE THE ROWS INTO A STRING  
                     WHILE(@Counter < @MaxID + 1)  
                          BEGIN  
                               SELECT @PKString = @PKString + Items FROM @PKTable WHERE ID = @Counter  
                               SET @Counter = @Counter + 1       
                          END  
                -- APPEND PK BUILD STRING TO DYNAMIC SQL       
                SET @SQL = @SQL + ' OUTPUT ' + @PKString +', ''' + @Column + ''' AS Column, DELETED.' + @Column + ' AS OldValue, INSERTED.' +  
                     @Column + ' AS NewValue, GETDATE() AS ModifiedDate,''' + CAST(@BatchLogID AS VARCHAR(10)) + ''' AS BatchLogID, ''' +  
                     CAST(OBJECT_NAME(@@PROCID) AS VARCHAR(50)) + ''' AS ProcedureName INTO ' + @LogLocation  
                -- APPEND WHERE CLAUSES TO DYNAMIC SQL       
                SET @SQL = @SQL + ' WHERE ' + @MatchingCode  
                -- PRINT OR EXECUTE DEPENDING ON PARAMETER       
                IF(@PrintOrExecute = 'P')  
                     BEGIN  
                          PRINT (@SQL)  
                     END  
                     ELSE  
                          EXEC (@SQL)  
           -- COMPLETE THE TRANSACTION  
           COMMIT TRANSACTION  
 END TRY   
 -- RETURN ERRORS AND ROLLBACK WHEN ERRORS OCCUR  
 BEGIN CATCH   
      -- ROLLBACK  
      ROLLBACK TRANSACTION  
      -- REPORT ERROR  
      SET @ErrorMessage = ERROR_MESSAGE()       
      RAISERROR (@ErrorMessage,17,1)  
 END CATCH  
 GO  

This is my code for returning a comma separated list containing the primary key columns for any specified table.

 ALTER PROCEDURE [Common].[GetPrimaryKey]  
 (  
      @Database AS VARCHAR(50),  
      @Schema AS VARCHAR(50),  
      @Table AS VARCHAR(50),  
      @PKList AS VARCHAR(200) OUTPUT  
 )  
 AS   
 SET NOCOUNT ON  
 -- DECLARE VARIABLES  
 DECLARE @SQL NVARCHAR(500)  
 -- CATCH ERRORS USING TRY..CATCH  
 BEGIN TRY   
           -- BUILD DYNAMIC SQL TO QUERY SYS TABLES AND RETURN THE PK FOR SPECIFIED TABLE IN A CSV LIST  
      SET @SQL = 'USE ' + @Database + '  
                SELECT @PKList = COALESCE(@PKList + '','','''') + CAST(CCU.COLUMN_NAME AS VARCHAR(20))  
                FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC  
                JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE CCU ON TC.CONSTRAINT_NAME = CCU.CONSTRAINT_NAME  
                WHERE TC.CONSTRAINT_TYPE = ''Primary Key''   
                AND TC.TABLE_CATALOG = ''' + @Database + '''  
                AND TC.TABLE_SCHEMA = ''' + @Schema + '''  
                AND TC.TABLE_NAME = ''' + @Table + ''''  
      -- EXECUTE THE DYNAMIC SQL  
      EXEC SP_ExecuteSQL @SQL, N'@PKList VARCHAR(200) OUTPUT', @PKList OUTPUT  
 END TRY  
 -- RETURN ERRORS AND ROLLBACK WHEN ERRORS OCCUR  
 BEGIN CATCH   
      -- REPORT ERRORS  
      SELECT ERROR_NUMBER() ErrorNumber, ERROR_SEVERITY() ErrorSeverity, ERROR_LINE() ErrorLine, ERROR_MESSAGE() ErrorMessage  
 END CATCH  
 GO  

Here is the code for pivoting a comma separated list into a table for use in the creation of a primary key list with values appended into the string.

 ALTER FUNCTION [Common].[Spliter](@String VARCHAR(8000), @Delimiter CHAR(1))      
 RETURNS @TempTable TABLE (Items VARCHAR(8000))      
 AS   
 BEGIN  
   DECLARE @IDX INT      
   DECLARE @Slice VARCHAR(8000)      
   SELECT @IDX = 1      
     IF LEN(@String)<1 OR @String IS NULL RETURN  
   WHILE @IDX!= 0      
   BEGIN  
     SET @IDX = CHARINDEX(@Delimiter,@String)      
     IF @IDX!=0      
                SET @Slice = LEFT(@String,@IDX - 1)      
           ELSE  
                SET @Slice = @String      
     IF(LEN(@Slice)>0)   
       INSERT INTO @Temptable(Items) VALUES(@Slice)      
     SET @String = RIGHT(@String,LEN(@String) - @IDX)      
     IF LEN(@String) = 0 BREAK  
   END  
 RETURN     
 END  
 GO  

No comments:

Post a Comment