import pyodbc import pandas as pd import clickhouse_connect from datetime import datetime import warnings # --------------------------------------------------- # Suppress Pandas Warning # --------------------------------------------------- warnings.filterwarnings( 'ignore', 'pandas only supports SQLAlchemy connectable' ) print("SCHEMA Migration Started :", datetime.now()) # --------------------------------------------------- # SQL Server Connection # --------------------------------------------------- SQL_CONN_STR = ( 'DRIVER={ODBC Driver 17 for SQL Server};' 'SERVER=10.200.25.65;' 'DATABASE=CPMIndiaBusinessInsight;' 'UID=bsgteam_test;' 'PWD=B$gt3@m#00512;' 'TrustServerCertificate=yes;' ) # --------------------------------------------------- # ClickHouse Connection # --------------------------------------------------- CH_CONFIG = { 'host': '172.188.12.194', 'port': 8123, 'username': 'default', 'password': 'dipanshu_k', 'database': 'DaburIndia_BI' } # --------------------------------------------------- # SQL Server → ClickHouse Datatype Mapping # --------------------------------------------------- DATATYPE_MAPPING = { 'bigint': 'Int64', 'int': 'Int32', 'smallint': 'Int16', 'tinyint': 'Int8', 'bit': 'UInt8', 'float': 'Float64', 'real': 'Float32', 'decimal': 'Float64', 'numeric': 'Float64', 'money': 'Float64', 'varchar': 'String', 'nvarchar': 'String', 'char': 'String', 'nchar': 'String', 'text': 'String', 'ntext': 'String', 'xml': 'String', 'date': 'Date32', 'datetime': 'DateTime64', 'datetime2': 'DateTime64', 'smalldatetime': 'DateTime64', 'time': 'String', 'uniqueidentifier': 'String', 'binary': 'String', 'varbinary': 'String' } try: # --------------------------------------------------- # Connect SQL Server # --------------------------------------------------- sql_conn = pyodbc.connect(SQL_CONN_STR) print("Connected to SQL Server") # --------------------------------------------------- # Connect ClickHouse # --------------------------------------------------- ch_client = clickhouse_connect.get_client(**CH_CONFIG) print("Connected to ClickHouse") # --------------------------------------------------- # Create Database if Not Exists # --------------------------------------------------- create_database_query = f""" CREATE DATABASE IF NOT EXISTS `{CH_CONFIG['database']}` """ ch_client.command(create_database_query) print(f"Database Verified : {CH_CONFIG['database']}") # --------------------------------------------------- # Get All SQL Server Tables # --------------------------------------------------- table_query = """ SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA = 'dbo' ORDER BY TABLE_NAME """ tables_df = pd.read_sql(table_query, sql_conn) tables = tables_df['TABLE_NAME'].tolist() print(f"Total Tables Found : {len(tables)}") # --------------------------------------------------- # Process Each Table # --------------------------------------------------- for table_name in tables: try: print("\n===================================") print(f"Creating Table : {table_name}") print("===================================") # --------------------------------------------------- # Safe Table Name # --------------------------------------------------- safe_table_name = ( table_name .replace('`', '') .replace('[', '') .replace(']', '') ) # --------------------------------------------------- # Get Table Schema # --------------------------------------------------- schema_query = """ SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? ORDER BY ORDINAL_POSITION """ schema_df = pd.read_sql( schema_query, sql_conn, params=[safe_table_name] ) # --------------------------------------------------- # Skip Empty Schema # --------------------------------------------------- if schema_df.empty: print(f"No Columns Found : {safe_table_name}") continue # --------------------------------------------------- # Build ClickHouse Columns # --------------------------------------------------- columns = [] for _, row in schema_df.iterrows(): col_name = str(row['COLUMN_NAME']).replace('`', '') sql_type = str(row['DATA_TYPE']).lower() nullable = str(row['IS_NULLABLE']) # --------------------------------------------------- # Get ClickHouse Datatype # --------------------------------------------------- ch_type = DATATYPE_MAPPING.get( sql_type, 'String' ) # --------------------------------------------------- # Nullable Handling # --------------------------------------------------- if nullable == 'YES': ch_type = f'Nullable({ch_type})' columns.append( f"`{col_name}` {ch_type}" ) # --------------------------------------------------- # Generate CREATE TABLE Query # --------------------------------------------------- create_table_query = f""" CREATE TABLE IF NOT EXISTS `{CH_CONFIG['database']}`.`{safe_table_name}` ( {', '.join(columns)} ) ENGINE = MergeTree() ORDER BY tuple() """ # --------------------------------------------------- # Print SQL # --------------------------------------------------- print("\nGenerated CREATE TABLE SQL:\n") print(create_table_query) # --------------------------------------------------- # Save SQL Log # --------------------------------------------------- with open( "clickhouse_schema_debug.log", "a", encoding="utf-8" ) as log_file: log_file.write("\n\n=====================================\n") log_file.write(f"TABLE : {safe_table_name}\n") log_file.write(create_table_query) log_file.write("\n=====================================\n") # --------------------------------------------------- # Execute CREATE TABLE # --------------------------------------------------- ch_client.command(create_table_query) print(f"Table Created Successfully : {safe_table_name}") except Exception as table_error: print("\n===================================") print(f"FAILED TABLE : {table_name}") print("ERROR :", str(table_error)) print("===================================") # --------------------------------------------------- # Error Logging # --------------------------------------------------- with open( "clickhouse_schema_error.log", "a", encoding="utf-8" ) as error_log: error_log.write("\n\n=====================================\n") error_log.write(f"TABLE : {table_name}\n") error_log.write(f"ERROR : {str(table_error)}\n") error_log.write("=====================================\n") continue print("\n===================================") print("ALL TABLE STRUCTURES CREATED") print("===================================") except Exception as e: print("\n===================================") print("MAIN ERROR :", str(e)) print("===================================") finally: # --------------------------------------------------- # Close Connections # --------------------------------------------------- try: sql_conn.close() print("SQL Server Connection Closed") except: pass try: ch_client.close() print("ClickHouse Connection Closed") except: pass print("Finished :", datetime.now())