292 lines
8.7 KiB
Python
292 lines
8.7 KiB
Python
|
|
import pyodbc
|
|
import pandas as pd
|
|
import clickhouse_connect
|
|
from datetime import datetime
|
|
import warnings
|
|
|
|
# ---------------------------------------------------
|
|
# Suppress Pandas Warning
|
|
# ---------------------------------------------------
|
|
warnings.filterwarnings(
|
|
'ignore',
|
|
'pandas only supports SQLAlchemy connectable'
|
|
)
|
|
|
|
print("SCHEMA Migration Started :", datetime.now())
|
|
|
|
# ---------------------------------------------------
|
|
# SQL Server Connection
|
|
# ---------------------------------------------------
|
|
SQL_CONN_STR = (
|
|
'DRIVER={ODBC Driver 17 for SQL Server};'
|
|
'SERVER=10.200.25.65;'
|
|
'DATABASE=CPMIndiaBusinessInsight;'
|
|
'UID=bsgteam_test;'
|
|
'PWD=B$gt3@m#00512;'
|
|
'TrustServerCertificate=yes;'
|
|
)
|
|
|
|
# ---------------------------------------------------
|
|
# ClickHouse Connection
|
|
# ---------------------------------------------------
|
|
CH_CONFIG = {
|
|
'host': '172.188.12.194',
|
|
'port': 8123,
|
|
'username': 'default',
|
|
'password': 'dipanshu_k',
|
|
'database': 'DaburIndia_BI'
|
|
}
|
|
|
|
# ---------------------------------------------------
|
|
# SQL Server → ClickHouse Datatype Mapping
|
|
# ---------------------------------------------------
|
|
DATATYPE_MAPPING = {
|
|
|
|
'bigint': 'Int64',
|
|
'int': 'Int32',
|
|
'smallint': 'Int16',
|
|
'tinyint': 'Int8',
|
|
'bit': 'UInt8',
|
|
|
|
'float': 'Float64',
|
|
'real': 'Float32',
|
|
'decimal': 'Float64',
|
|
'numeric': 'Float64',
|
|
'money': 'Float64',
|
|
|
|
'varchar': 'String',
|
|
'nvarchar': 'String',
|
|
'char': 'String',
|
|
'nchar': 'String',
|
|
'text': 'String',
|
|
'ntext': 'String',
|
|
'xml': 'String',
|
|
|
|
'date': 'Date32',
|
|
'datetime': 'DateTime64',
|
|
'datetime2': 'DateTime64',
|
|
'smalldatetime': 'DateTime64',
|
|
|
|
'time': 'String',
|
|
|
|
'uniqueidentifier': 'String',
|
|
|
|
'binary': 'String',
|
|
'varbinary': 'String'
|
|
}
|
|
|
|
try:
|
|
|
|
# ---------------------------------------------------
|
|
# Connect SQL Server
|
|
# ---------------------------------------------------
|
|
sql_conn = pyodbc.connect(SQL_CONN_STR)
|
|
|
|
print("Connected to SQL Server")
|
|
|
|
# ---------------------------------------------------
|
|
# Connect ClickHouse
|
|
# ---------------------------------------------------
|
|
ch_client = clickhouse_connect.get_client(**CH_CONFIG)
|
|
|
|
print("Connected to ClickHouse")
|
|
|
|
# ---------------------------------------------------
|
|
# Create Database if Not Exists
|
|
# ---------------------------------------------------
|
|
create_database_query = f"""
|
|
CREATE DATABASE IF NOT EXISTS `{CH_CONFIG['database']}`
|
|
"""
|
|
|
|
ch_client.command(create_database_query)
|
|
|
|
print(f"Database Verified : {CH_CONFIG['database']}")
|
|
|
|
# ---------------------------------------------------
|
|
# Get All SQL Server Tables
|
|
# ---------------------------------------------------
|
|
table_query = """
|
|
SELECT TABLE_NAME
|
|
FROM INFORMATION_SCHEMA.TABLES
|
|
WHERE TABLE_TYPE = 'BASE TABLE'
|
|
AND TABLE_SCHEMA = 'dbo'
|
|
ORDER BY TABLE_NAME
|
|
"""
|
|
|
|
tables_df = pd.read_sql(table_query, sql_conn)
|
|
|
|
tables = tables_df['TABLE_NAME'].tolist()
|
|
|
|
print(f"Total Tables Found : {len(tables)}")
|
|
|
|
# ---------------------------------------------------
|
|
# Process Each Table
|
|
# ---------------------------------------------------
|
|
for table_name in tables:
|
|
|
|
try:
|
|
|
|
print("\n===================================")
|
|
print(f"Creating Table : {table_name}")
|
|
print("===================================")
|
|
|
|
# ---------------------------------------------------
|
|
# Safe Table Name
|
|
# ---------------------------------------------------
|
|
safe_table_name = (
|
|
table_name
|
|
.replace('`', '')
|
|
.replace('[', '')
|
|
.replace(']', '')
|
|
)
|
|
|
|
# ---------------------------------------------------
|
|
# Get Table Schema
|
|
# ---------------------------------------------------
|
|
schema_query = """
|
|
SELECT
|
|
COLUMN_NAME,
|
|
DATA_TYPE,
|
|
IS_NULLABLE
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_NAME = ?
|
|
ORDER BY ORDINAL_POSITION
|
|
"""
|
|
|
|
schema_df = pd.read_sql(
|
|
schema_query,
|
|
sql_conn,
|
|
params=[safe_table_name]
|
|
)
|
|
|
|
# ---------------------------------------------------
|
|
# Skip Empty Schema
|
|
# ---------------------------------------------------
|
|
if schema_df.empty:
|
|
|
|
print(f"No Columns Found : {safe_table_name}")
|
|
continue
|
|
|
|
# ---------------------------------------------------
|
|
# Build ClickHouse Columns
|
|
# ---------------------------------------------------
|
|
columns = []
|
|
|
|
for _, row in schema_df.iterrows():
|
|
|
|
col_name = str(row['COLUMN_NAME']).replace('`', '')
|
|
|
|
sql_type = str(row['DATA_TYPE']).lower()
|
|
|
|
nullable = str(row['IS_NULLABLE'])
|
|
|
|
# ---------------------------------------------------
|
|
# Get ClickHouse Datatype
|
|
# ---------------------------------------------------
|
|
ch_type = DATATYPE_MAPPING.get(
|
|
sql_type,
|
|
'String'
|
|
)
|
|
|
|
# ---------------------------------------------------
|
|
# Nullable Handling
|
|
# ---------------------------------------------------
|
|
if nullable == 'YES':
|
|
ch_type = f'Nullable({ch_type})'
|
|
|
|
columns.append(
|
|
f"`{col_name}` {ch_type}"
|
|
)
|
|
|
|
# ---------------------------------------------------
|
|
# Generate CREATE TABLE Query
|
|
# ---------------------------------------------------
|
|
create_table_query = f"""
|
|
CREATE TABLE IF NOT EXISTS `{CH_CONFIG['database']}`.`{safe_table_name}`
|
|
(
|
|
{', '.join(columns)}
|
|
)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
"""
|
|
|
|
# ---------------------------------------------------
|
|
# Print SQL
|
|
# ---------------------------------------------------
|
|
print("\nGenerated CREATE TABLE SQL:\n")
|
|
print(create_table_query)
|
|
|
|
# ---------------------------------------------------
|
|
# Save SQL Log
|
|
# ---------------------------------------------------
|
|
with open(
|
|
"clickhouse_schema_debug.log",
|
|
"a",
|
|
encoding="utf-8"
|
|
) as log_file:
|
|
|
|
log_file.write("\n\n=====================================\n")
|
|
log_file.write(f"TABLE : {safe_table_name}\n")
|
|
log_file.write(create_table_query)
|
|
log_file.write("\n=====================================\n")
|
|
|
|
# ---------------------------------------------------
|
|
# Execute CREATE TABLE
|
|
# ---------------------------------------------------
|
|
ch_client.command(create_table_query)
|
|
|
|
print(f"Table Created Successfully : {safe_table_name}")
|
|
|
|
except Exception as table_error:
|
|
|
|
print("\n===================================")
|
|
print(f"FAILED TABLE : {table_name}")
|
|
print("ERROR :", str(table_error))
|
|
print("===================================")
|
|
|
|
# ---------------------------------------------------
|
|
# Error Logging
|
|
# ---------------------------------------------------
|
|
with open(
|
|
"clickhouse_schema_error.log",
|
|
"a",
|
|
encoding="utf-8"
|
|
) as error_log:
|
|
|
|
error_log.write("\n\n=====================================\n")
|
|
error_log.write(f"TABLE : {table_name}\n")
|
|
error_log.write(f"ERROR : {str(table_error)}\n")
|
|
error_log.write("=====================================\n")
|
|
|
|
continue
|
|
|
|
print("\n===================================")
|
|
print("ALL TABLE STRUCTURES CREATED")
|
|
print("===================================")
|
|
|
|
except Exception as e:
|
|
|
|
print("\n===================================")
|
|
print("MAIN ERROR :", str(e))
|
|
print("===================================")
|
|
|
|
finally:
|
|
|
|
# ---------------------------------------------------
|
|
# Close Connections
|
|
# ---------------------------------------------------
|
|
try:
|
|
sql_conn.close()
|
|
print("SQL Server Connection Closed")
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
ch_client.close()
|
|
print("ClickHouse Connection Closed")
|
|
except:
|
|
pass
|
|
|
|
print("Finished :", datetime.now())
|