Connect to DB2 from python with SQLAlchemy
This is kind of a sister post to my Databricks-specific post from the other day.
It's amazing how much time you can spend searching through docs, fiddling with connection strings, and trying different engines because some <cough>
IBM </cough>
don't seem to be working very well or the docs aren't quite up to date or whatever.
Maybe there are other people that need to use DB2 for whatever godawful reason. Maybe those people want to start using Python or Airflow or something. Maybe those people are just me six months from now. Here is what got everything working.
TLDR; Use pyodbc with ibm_db_sa
. The connection string should look like
'ibm_db_sa+pyodbc400://{username}:{password}@{host}:{port}/{database};currentSchema={schema}'
Now for the long form answer...
# requirements.txt
ibm_db
ibm_db_sa
pyodbc
SQLAlchemy
# database_engine.py
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def create_database_engine():
connection_string = 'ibm_db_sa+pyodbc400://{username}:{password}@{host}:{port}/{database};currentSchema={schema}'.format(
username='',
password='',
host='',
port='',
database='',
schema=''
)
return create_engine(connection_string)
engine = create_database_engine()
def create_session():
Session = sessionmaker(bind=engine)
return Session()
@contextmanager
def session_scope():
"""Provide a transactional scope around a series of operations."""
session = create_session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
Here is the model:
# models.py
from sqlalchemy import Column, Integer, String, DateTime, Text, MetaData
from sqlalchemy.ext.declarative import declarative_base
from database_engine import engine
metadata = MetaData(schema='Restaurant')
Base = declarative_base(bind=engine, metadata=metadata)
class Transaction(Base):
__tablename__ = 'Transaction'
id = Column('ID', String(100), primary_key=True)
store_id = Column('STORE_ID', Integer)
created_time = Column('CREATED_TIME', DateTime)
transaction_json = Column('TXN_JSON', Text)
And a sample use:
from database_engine import session_scope
from models import Transaction
if __name__ == '__main__':
with session_scope() as session:
results = session.query(Transaction).limit(10)
for result in results:
print(result.id)