Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.env
/scripts
/backup
/backup
**/__pycache__/*
**/data
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,44 @@
# postgres_docker
Local setup of Postgres on Docker Compose

### Run Metabase Configurator
1. Run `docker exec dbt python metabase_setup.py`



**DOCKER EXEC à EXECUTER SUR UN TERMINAL HORS ATTACH SHELL**

### Run dbt-docs website
1. Run `docker exec -d dbt bash serve_dbt_docs.sh`
You can then access the docs website on http://localhost:4444

## codegen dbt
toutes ces commandes sont a lancer depuis la baleine postgres dbt attach shell.
une fois dans le contenaire, `cd demo_project`

### generate source yml pour stg (script1)
`dbt run-operation generate_source --args '{"schema_name": "stg","generate_columns":True}'`

### generate script sql d'une table (script2)
`dbt run-operation generate_base_model --args '{"source_name": "stg", "table_name": "immats"}'`

### Dbt run hors attach shell
1. Run `docker exec dbt bash run_dbt.sh`

### generate yml d'une table (script3)
`dbt run-operation generate_model_yaml --args '{"model_name": "immats_ods"}'`

## Etapes quand on a une nouvelle soucre
*prérequis : avoir développer et lancer un flux "EL"*
1. run (script 1) et copier coller les lignes de la nouvelle soucre dans stg.yml
2. run (script 2) avec comme valeur pour table_name le nom de la nouvelle table. puis copier coller le contenu dans un fichier models/ods/nom_du_dossier/nom_de_la_table_ods.sql
3. créer la nouvelle table en lançant `dbt run`
4. run (script 3) puis copier coller le contenu dans model/ods/nom_du_dossier/nom_du_dossier.yml
5. pour créer la vue créer un fichier model/prs/nom_du_dossier/nom_table_prs.sql et mettre dedans ce code en changeant le nom de la table `select * from {{ ref('immats_vrp_ods') }}`
6. dbt run

## dbt deps sert à installer des dépendences

## dbt clean sert à les supprimer

## ./demo_project/dbt_modules si le dossier existe tout va bien sinon dbt deps dans cd demo_project
25 changes: 0 additions & 25 deletions datafuel.docker-compose.yml

This file was deleted.

14 changes: 14 additions & 0 deletions dbt_repo/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.8

# set the working directory in the image/container to /src
WORKDIR /src

# Add /src to PYTHONPATH
ENV PYTHONPATH "$PYTHONPATH:/src:/src_metabt"
ENV PATH "$PATH:/src:/src_metabt"

# copy the dependencies file to the working directory
COPY requirements.txt .

# install dependencies
RUN pip install -r requirements.txt
84 changes: 84 additions & 0 deletions dbt_repo/metabt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging

from .dbt import DbtReader
from .metabase import MetabaseClient, AdminClient
from .configurator_settings import (
DWH_POSTGRES_ADMIN, DWH_POSTGRES_PASSWORD, DWH_POSTGRES_DB,
DWH_POSTGRES_HOST, MB_ADMIN_FIRST_NAME, MB_ADMIN_LAST_NAME,
MB_ADMIN_EMAIL, MB_ADMIN_PASSWORD, SETUP_TOKEN_URL,
SETUP_ADMIN_URL, SETUP_DATABASE_URL, NEW_USER_URL
)

__version__ = '0.5.1'

def export(dbt_path: str,
mb_host: str, mb_user: str, mb_password: str,
database: str,
mb_https = True, sync = True, sync_timeout = 30,
includes = [], excludes = []):
"""Exports models from dbt to Metabase.

Arguments:
dbt_path {str} -- Path to dbt project.
mb_host {str} -- Metabase hostname.
mb_user {str} -- Metabase username.
mb_password {str} -- Metabase password.
database {str} -- Target database name.
schema {str} -- Target schema name.

Keyword Arguments:
mb_https {bool} -- Use HTTPS to connect to Metabase instead of HTTP. (default: {True})
sync {bool} -- Synchronize Metabase database before export. (default: {True})
sync_timeout {int} -- Synchronization timeout in seconds. (default: {30})
includes {list} -- Model names to limit processing to. (default: {[]})
excludes {list} -- Model names to exclude. (default: {[]})
"""

mbc = MetabaseClient(mb_host, mb_user, mb_password, mb_https)
models = DbtReader(dbt_path).read_models(
# includes=includes,
# excludes=excludes
)
if sync:
if not mbc.sync_and_wait(database, models, sync_timeout):
logging.critical("Sync timeout reached, models still not compatible")
return

mbc.export_models(database, models)

def main(args: list = None):
import argparse

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

parser = argparse.ArgumentParser(
description='Model synchronization from dbt to Metabase.'
)
parser.add_argument('command', choices=['export'], help="command to execute")
parser.add_argument('--dbt_path', metavar='PATH', required=True, help="path to dbt project")
parser.add_argument('--mb_host', metavar='HOST', required=True, help="Metabase hostname")
parser.add_argument('--mb_user', metavar='USER', required=True, help="Metabase username")
parser.add_argument('--mb_password', metavar='PASS', required=True, help="Metabase password")
parser.add_argument('--mb_https', metavar='HTTPS', type=bool, default=True, help="use HTTPS to connect to Metabase instead of HTTP")
parser.add_argument('--database', metavar='DB', required=True, help="target database name")
# parser.add_argument('--schema', metavar='SCHEMA', required=True, help="target schema name")
parser.add_argument('--sync', metavar='ENABLE', type=bool, default=True, help="synchronize Metabase database before export")
parser.add_argument('--sync_timeout', metavar='SECS', type=int, default=30, help="synchronization timeout (in secs)")
parser.add_argument('--includes', metavar='MODELS', nargs='*', default=[], help="model names to limit processing to")
parser.add_argument('--excludes', metavar='MODELS', nargs='*', default=[], help="model names to exclude")
parsed = parser.parse_args(args=args)

if parsed.command == 'export':
export(
dbt_path=parsed.dbt_path,
mb_host=parsed.mb_host,
mb_user=parsed.mb_user,
mb_password=parsed.mb_password,
mb_https=parsed.mb_https,
database=parsed.database,
# schema=parsed.schema,
sync=parsed.sync,
sync_timeout=parsed.sync_timeout,
includes=parsed.includes,
excludes=parsed.excludes
)
23 changes: 23 additions & 0 deletions dbt_repo/metabt/configurator_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

DWH_POSTGRES_ADMIN = os.environ['DWH_POSTGRES_ADMIN']
DWH_POSTGRES_PASSWORD = os.environ['DWH_POSTGRES_PASSWORD']
DWH_POSTGRES_DB = os.environ['DWH_POSTGRES_DB']
DWH_POSTGRES_HOST = os.environ['DWH_POSTGRES_HOST']


MB_ADMIN_FIRST_NAME = os.environ['MB_ADMIN_FIRST_NAME']
MB_ADMIN_LAST_NAME = os.environ['MB_ADMIN_LAST_NAME']
MB_ADMIN_EMAIL = os.environ['MB_ADMIN_EMAIL']
MB_ADMIN_PASSWORD = os.environ['MB_ADMIN_PASSWORD']

METABASE_HOST='http://metabase:3000'
SETUP_TOKEN_ENDPOINT='api/session/properties'
SETUP_ADMIN_ENDPOINT='api/setup'
SETUP_DATABASE_ENDPOINT='api/database'
NEW_USER_ENDPOINT='api/user'

SETUP_TOKEN_URL = f'{METABASE_HOST}/{SETUP_TOKEN_ENDPOINT}'
SETUP_ADMIN_URL = f'{METABASE_HOST}/{SETUP_ADMIN_ENDPOINT}'
SETUP_DATABASE_URL = f'{METABASE_HOST}/{SETUP_DATABASE_ENDPOINT}'
NEW_USER_URL = f'{METABASE_HOST}/{NEW_USER_ENDPOINT}'
148 changes: 148 additions & 0 deletions dbt_repo/metabt/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import logging
import yaml
import re
from pathlib import Path

# Allowed metabase.* fields
_META_FIELDS = [
'special_type',
'visibility_type'
]

STG_SCHEMA = 'stg'
ODS_SCHEMA = 'ods'
PRS_SCHEMA = 'prs'


class DbtReader:
"""Reader for dbt project configuration.
"""

def __init__(self, project_path: str):
"""Constructor.

Arguments:
project_path {str} -- Path to dbt project root.
"""

self.project_path = project_path

def read_models(self, includes = [], excludes = []) -> list:
"""Reads dbt models in Metabase-friendly format.

Keyword Arguments:
includes {list} -- Model names to limit processing to. (default: {[]})
excludes {list} -- Model names to exclude. (default: {[]})

Returns:
list -- List of dbt models in Metabase-friendly format.
"""

mb_models = []

for path in (Path(self.project_path) / 'models').rglob('*.yml'):
schema = self.get_schema_from_path(path)
with open(path, 'r') as stream:
db_dict = yaml.safe_load(stream)
for model in db_dict.get('models', []):
name = model['name']
if (not includes or name in includes) and (name not in excludes):
mb_models.append(self.read_model(model, schema))

return mb_models

def read_model(self, model: dict, schema: str) -> dict:
"""Reads one dbt model in Metabase-friendly format.

Arguments:
model {dict} -- One dbt model to read.

Returns:
dict -- One dbt model in Metabase-friendly format.
"""

mb_columns = []

for column in model.get('columns', []):
mb_columns.append(self.read_column(column))

return {
'name': model['name'].upper(),
'description': model.get('description'),
'columns': mb_columns,
'schema': schema
}

def read_column(self, column: dict) -> dict:
"""Reads one dbt column in Metabase-friendly format.

Arguments:
column {dict} -- One dbt column to read.

Returns:
dict -- One dbt column in Metabase-friendly format.
"""

mb_column = {
'name': column.get('name', '').upper(),
'description': column.get('description')
}

for test in column.get('tests', []):
if isinstance(test, dict):
if 'relationships' in test:
relationships = test['relationships']
mb_column['special_type'] = 'type/FK'
mb_column['fk_target_table'] = self.parse_ref(relationships['to']).upper()
mb_column['fk_target_field'] = relationships['field'].upper()

if 'meta' in column:
meta = column.get('meta')
for field in _META_FIELDS:
if f'metabase.{field}' in meta:
mb_column[field] = meta[f'metabase.{field}']

return mb_column

@staticmethod
def parse_ref(text: str) -> str:
"""Parses dbt ref() statement.

Arguments:
text {str} -- Full statement in dbt YAML.

Returns:
str -- Name of the reference.
"""

matches = re.findall(r"ref\(['\"]([\w\_\-\ ]+)['\"]\)", text)
if matches:
return matches[0]
return text

@staticmethod
def get_schema_from_path(path: str) -> str:
"""Parses dbt ref() statement.

Arguments:
text {str} -- Full statement in dbt YAML.

Returns:
str -- Name of the reference.
"""
path = path.as_posix()
if 'models/stg' in path:
schema = STG_SCHEMA

elif 'models/ods' in path:
schema = ODS_SCHEMA

elif 'models/prs' in path:
schema = PRS_SCHEMA

else:
schema = None
print('error')
raise NotImplementedErrorNotImplemented

return schema
Loading