ploomber.clients.SQLAlchemyClient

class ploomber.clients.SQLAlchemyClient(uri, split_source='default', create_engine_kwargs=None)

Client for connecting with any SQLAlchemy supported database

Parameters
  • uri (str or sqlalchemy.engine.url.URL) – URI to pass to sqlalchemy.create_engine or URL object created using sqlalchemy.engine.url.URL.create

  • split_source (str, optional) – Some database drivers do not support multiple commands in a single execute statement. Use this option to split commands by a given character (e.g. ‘;’) and send them one at a time. Defaults to ‘default’, which splits by ‘;’ if using SQLite database, but does not perform any splitting with other databases. If None, it will never split, a string value is interpreted as the token to use for splitting statements regardless of the database type

  • create_engine_kwargs (dict, optional) – Keyword arguments to pass to sqlalchemy.create_engine

Notes

SQLite client does not support sending more than one command at a time, if using such backend code will be split and several calls to the db will be performed.

Examples

Spec API:

Given the following clients.py:

import sqlalchemy
from ploomber.clients import SQLAlchemyClient

def get():
    url = sqlalchemy.engine.url.URL.create(drivername='sqlite',
                                           database='my_db.db')
    return SQLAlchemyClient(url)

Spec API (dag-level client):

clients:
    # key is a task class such as SQLDump or SQLScript
    SQLDump: clients.get

tasks:
    - source: query.sql
      product: output/data.csv

Spec API (task-level client):

tasks:
    - source: query.sql
      product: output/data.csv
      client: clients.get

Python API (dag-level client):

>>> import sqlite3
>>> import sqlalchemy
>>> import pandas as pd
>>> from ploomber import DAG
>>> from ploomber.products import File
>>> from ploomber.tasks import SQLDump
>>> from ploomber.clients import SQLAlchemyClient
>>> con_raw = sqlite3.connect(database='my.db')
>>> df = pd.DataFrame({'a': range(100), 'b': range(100)})
>>> _ = df.to_sql('numbers', con_raw, index=False)
>>> con_raw.close()
>>> dag = DAG()
>>> url = sqlalchemy.engine.url.URL.create(drivername='sqlite',
...                                        database='my.db')
>>> client = SQLAlchemyClient(url)
>>> dag.clients[SQLDump] = client # dag-level client
>>> _ = SQLDump('SELECT * FROM numbers', File('data.parquet'),
...             dag=dag, name='dump',
...             chunksize=None) # no need to pass client here
>>> _ = dag.build()
>>> df = pd.read_parquet('data.parquet')
>>> df.head(3)
   a  b
0  0  0
1  1  1
2  2  2

Python API (task-level client):

>>> import sqlite3
>>> import sqlalchemy
>>> import pandas as pd
>>> from ploomber import DAG
>>> from ploomber.products import File
>>> from ploomber.tasks import SQLDump
>>> from ploomber.clients import SQLAlchemyClient
>>> con_raw = sqlite3.connect(database='some.db')
>>> df = pd.DataFrame({'a': range(100), 'b': range(100)})
>>> _ = df.to_sql('numbers', con_raw, index=False)
>>> con_raw.close()
>>> dag = DAG()
>>> url = sqlalchemy.engine.url.URL.create(drivername='sqlite',
...                                        database='some.db')
>>> client = SQLAlchemyClient(url)
>>> _ = SQLDump('SELECT * FROM numbers', File('data.parquet'),
...             dag=dag, name='dump',
...             client=client, # pass client to task
...             chunksize=None)
>>> _ = dag.build()
>>> df = pd.read_parquet('data.parquet')
>>> df.head(3)
   a  b
0  0  0
1  1  1
2  2  2

See also

ploomber.clients.DBAPIClient

A client to connect to a database

Methods

close()

Closes all connections

cursor()

execute(code)

Execute code

close()

Closes all connections

cursor()
execute(code)

Execute code

Attributes

connection

Return a connection from the pool

engine

Returns a SQLAlchemy engine

split_source_mapping