Aller au contenu

« Databricks » : différence entre les versions

De Banane Atomic
Aucun résumé des modifications
 
(96 versions intermédiaires par le même utilisateur non affichées)
Ligne 1 : Ligne 1 :
= Links =
* [https://docs.databricks.com/aws/en Documentation (AWS)]
* [https://customer-academy.databricks.com Databricks Academy]
* [https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-functions-builtin Built-In SQL Functions]
= Description =
= Description =
Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.<br>
Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.<br>
Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.
Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.<br>
[[File:Databricks.jpg|120px]]
[[File:Databricks.jpg|120px]]
= Data objects =
== Delta Table ==
=== Create ===
<kode lang='dbsql'>
-- create with schema
create table if not exists table1 (
  id string,
  date_string string,
  date date generated always as (
    cast(date_string as date))
  comment "generated based on `date_string` column");
-- create from other data
create or replace table table1 as
select * from parquet.`${parquet-data-folder}`;
drop table if exists table1;
-- deep clone clones the metadata and the data
-- shallow clone clones only the metadata
create or replace cloned_table
deep clone source_table;
</kode>
=== Alter ===
<kode lang='dbsql'>
alter table table1
add constraint constraint1 check (date > '2025-01-30');
</kode>
=== Describe ===
<kode lang='dbsql'>
describe table1; -- display the column names and types
describe extended table1; -- display additional information
describe history table1; -- display delta log for each data change
</kode>
=== [https://docs.databricks.com/aws/en/delta/clustering Liquid clustering] ===
<kode lang='dbsql'>
create or replace table1 cluster by (id) as
select * from table0;
alter table table1
cluster by (id);
optimize table1; -- schedule regular optimize job to cluster data (every hour)
</kode>
== View ==
A temporary view is available while the cluster is running but it is not stored in the schema.
<kode lang='dbsql'>
create or replace temp view view1 as
select * from csv.`${csv_path}`;
</kode>
== Functions ==
= Data manipulation =
== copy ==
<kode lang='dbsql'>
-- idempotent
copy into table1
from ${parquet-data-folder}
fileformat = parquet
copy_options ('mergeSchema' = 'true');
copy into table1
(select *,
cast(column2 as date) column2_date,
current_timestamp() updated,
input_file_name() source_file
from ${parquet-data-folder})
fileformat = parquet
copy_options ('mergeSchema' = 'true');
</kode>
== merge ==
<kode lang='dbsql'>
merge into destination_table dst
using source_table src
on dst.id = src.id
when matched and dst.column1 is null and src.column1 is not null then
  update set column1 = src.column1, column2 = src.column2
when not matched then
  insert (id, column1, column2)
  values (dst.id, dst.column1, dst.column2);
-- do not merge duplicates
merge into destination_table dst
using source_table src
on dst.id = src.id
when not matched then
  insert *;
</kode>
== distinct ==
<kode lang='dbsql'>
-- overwrite the data table filtering out the duplicates
insert overwrite table1
  select distinct(*) fron table1;
</kode>
= Components =
== Delta Lake ==
The data lakehouse storage:
* ACID transactions
* Scalable data and metadata handling
* Audit history and time travel (querying previous versions of the data)
* Schema enforcement and evolution
* Streaming and batch data processing
* use Delta Tables (enhanced version of Apache Parquet files, a columnar storage file format optimized for efficient storage and retrieval of large-scale datasets)
== Unity Catalog ==
The data governance module:
* data federation: unified view of data from multiple sources
* handle access permissions to data
* AI-driven monitoring and reporting
== Photon ==
Query engine used to improve query performance, reduce costs, and optimize resource utilization.
== Databricks SQL ==
Datawarehouse component:
* text to SQL queries
* auto-scale for better performances and cost
== Workflows ==
Orchestration:
* intelligent pipeline triggering (scheduled, file arrival trigger, delta table update)
* automatic resource allocation
* automatic checkpoint and recovery (in event of failure, pipeline recovers from the last checkpoint)
* automatic monitoring and alert (errors, timeouts)
== Delta Live Tables ==
ETL & Real-time Analytics
== Databricks AI ==
Data Science and AI
= Menu =
{| class="wikitable wtp"
! Entry
! Description
|-
| Workspace || Store code, files and projects.
* Home: personal space
* Workspace: shared space, repositories
|-
| Catalog || Manage and organize data assets: databases, tables, views and functions
|-
| Workflow || Manage jobs and pipelines
|-
| Compute || Manage clusters
|}
= Code =
<kode lang='python'>
import os
import shutil
import requests
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
table_name = "table1"
spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog_name}`")  # create a catalog
spark.sql(f"USE CATALOG `{catalog_name}`")                  # set as the default catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{schema_name}`")    # create a schema
spark.sql(f"USE SCHEMA `{schema_name}`")                    # set as the default schema
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{volume_name}`")    # create a volume
volume_full_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
target_table_full_path = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
df = spark.table(target_table_full_path)
display(df)
</kode>
== Data Analysis ==
<kode lang='python'>
# variables
catalog_name = "catalog1"
schema_name = "schema1"
bronze_table_name = "bronze" # raw data
silver_table_name = "silver" # processed and cleaned data
gold_table_name = "gold" # aggregated data with business insights
# widgets
dbutils.widgets.text("catalog_name", catalog_name)
dbutils.widgets.text("schema_name", schema_name)
dbutils.widgets.text("bronze_table", bronze_table_name)
dbutils.widgets.text("silver_table", silver_table_name)
dbutils.widgets.text("gold_table", gold_table_name)
</kode>
<kode lang='DBSQL'>
create catalog if not exists identifier(:catalog_name);
use catalog identifier(:catalog_name);
create schema if not exists identifier(:schema_name);
use schema identifier(:schema_name);
create volume if not exists identifier(:volume_name);
select * from identifier(:bronze_table);
create or replace table identifier(:silver_table) as
select cast(column1 as float), cast(column2 as int)
from identifier(:bronze_table)
where try_cast(column1 as float) is not null
and try_cast(column2 as int) > 100
order by column1;
-- get the column names and types of a table
describe identifier(:silver_table);
</kode>
= [https://docs.databricks.com/aws/en/notebooks/widgets Using Widgets for SQL Parameterization] =
The {{boxx|identifier()}} function ensures that the widget value is treated as a valid database object name.
<kode lang='python'>
# create a widget
dbutils.widgets.text("table_name", "table1")
dbutils.widgets.remove("table_name") # remove a widget
dbutils.widgets.removeAll()
</kode>
<kode lang='DBSQL'>
select * from identifier(:table_name);
</kode>
= Job =
== Job Parameters ==
<kode lang='py'>
# create a widget that could be set as a job parameter
dbutils.widgets.text("key", "default value")
value = dbutils.widgets.get("key")
</kode>
== Task values ==
Set a task value in that is accessible from other tasks.
<kode lang='py'>
dbutils.jobs.taskValues.set('key', 'value')
</kode>
<kode lang='json'>
// get the task value (in the condition field for instance)
{{tasks.TaskName.values.key}}
</kode>
== Job Contexts ==
= Pipeline =
* ETL
* Unit test / data quality
= JSON =
<kode lang='py'>
# read json from file and create a data frame
df = spark.read.json(json_file_path)
# read json from a df column and parse it
from pyspark.sql.functions import col, from_json
schema = "id INT, description STRING"
df = df.select(from_json(col("column1"), schema).alias("json")).select("json.*")
</kode>
<kode lang='dbsql'>
create temporary view multiLineJsonTable
using json
options (path = "/Volumes/catalog_name/schema_name/volume_name/file.json", multiline = "true");
select schema_of_json(`${json}`) as schema;
create or replace temp view view1 as
  select json.*
  from (
    select from_json(value, `${schema}`) as json
    from table1);
</kode>
== Panda ==
<kode lang='py'>
import pandas as pd
from pandas import json_normalize
# read json from memory and create a data frame
pdf = pd.DataFrame(json_data)
pdf = pd.DataFrame(json_data['nested_property_name'])
pdf = json_normalize(json_data)
pdf = json_normalize(json_data, errors='ignore')
pdf = json_normalize(json_data, 'nested_property_name', errors='ignore')
# convert panda df to spark df
df = spark.createDataFrame(pdf)
</kode>
= CSV =
<kode lang='python'>
catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
csv_dataset_path = f"{volume_path}/data.csv"
df = spark.read.csv(csv_dataset_path, header=True, inferSchema=True)
display(df)
df.write.saveAsTable("data")
</kode>
<kode lang='dbsql'>
select * from csv.`${csv_dataset_path}` limit 10;
select * from csv.`/Volumes/catalog1/schema1/volume1/data.csv` limit 10;
</kode>
= SQL User Defined Function =
<kode lang='dbsql'>
create or replace function fct1(id int, name string)
returns string
return concat("Item ", id, " is named ", name);
select *, fct1(id, name) as message
from table1;
describe function extended fct1;
</kode>
{{info | Persist between execution environments.}}
= Time Travel =
<kode lang='dbsql'>
describe history table1; -- display delta log for each data change
-- query the table1 at a specific version
select *
from table1 version as of 3;
-- rollback
restore table table1 to version as of 3;
</kode>
= API request =
<kode lang='py'>
import requests, datetime, json
today = datetime.date.today().strftime("%Y-%m-%d")
dayplusone = (datetime.date.today() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
url = f"https://www.domain.net/api?begin={today}&end={dayplusone}"
headers = {"X-API-KEY": "xxx"}
response = requests.get(url, headers=headers)
json_data = response.json()
</kode>
= Read and write files in volume =
<kode lang='py'>
json_file_path = f"{volume_path}/file.json"
# write
dbutils.fs.put(json_file_path, json.dumps(json_data), overwrite=True)
# read into data frame
df = spark.read.json(json_file_path)
</kode>
= SQL Date Time =
<kode lang='dbsql'>
-- string to timestamp_ntz
select to_timestamp_ntz('2025-04-24 00:00+02:00', 'yyyy-MM-dd HH:mmXXX') AS converted_timestamp_ntz;
</kode>


= History =
= History =
{| class="wikitable wtp"  
{| class="wikitable wtp"  
|-
|-
| 1980 - Data warehouse || Collect and store structured data to provide support for for refined analysis and reporting.
| 1980 - Data warehouse || Collect and store structured data to provide support for refined analysis and reporting.
|-
|-
| 2000 - Data lake || Collect and store raw data and conducting exploratory analysis
| 2000 - Data lake || Collect and store raw data and conducting exploratory analysis

Dernière version du 28 avril 2025 à 14:12

Links

Description

Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.
Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.
Erreur lors de la création de la vignette : /bin/bash: /usr/bin/convert: No such file or directory Error code: 127

Data objects

Delta Table

Create

Fichier:Dbsql.svg
-- create with schema
create table if not exists table1 (
  id string,
  date_string string,
  date date generated always as (
    cast(date_string as date))
  comment "generated based on `date_string` column");

-- create from other data
create or replace table table1 as
select * from parquet.`${parquet-data-folder}`;
drop table if exists table1;

-- deep clone clones the metadata and the data
-- shallow clone clones only the metadata
create or replace cloned_table
deep clone source_table;

Alter

Fichier:Dbsql.svg
alter table table1
add constraint constraint1 check (date > '2025-01-30');

Describe

Fichier:Dbsql.svg
describe table1; -- display the column names and types
describe extended table1; -- display additional information
describe history table1; -- display delta log for each data change

Liquid clustering

Fichier:Dbsql.svg
create or replace table1 cluster by (id) as
select * from table0;

alter table table1
cluster by (id);

optimize table1; -- schedule regular optimize job to cluster data (every hour)

View

A temporary view is available while the cluster is running but it is not stored in the schema.

Fichier:Dbsql.svg
create or replace temp view view1 as
select * from csv.`${csv_path}`;

Functions

Data manipulation

copy

Fichier:Dbsql.svg
-- idempotent
copy into table1
from ${parquet-data-folder}
fileformat = parquet
copy_options ('mergeSchema' = 'true');

copy into table1
(select *,
cast(column2 as date) column2_date,
current_timestamp() updated,
input_file_name() source_file
from ${parquet-data-folder})
fileformat = parquet
copy_options ('mergeSchema' = 'true');

merge

Fichier:Dbsql.svg
merge into destination_table dst
using source_table src
on dst.id = src.id
when matched and dst.column1 is null and src.column1 is not null then
  update set column1 = src.column1, column2 = src.column2
when not matched then
  insert (id, column1, column2)
  values (dst.id, dst.column1, dst.column2);

-- do not merge duplicates
merge into destination_table dst
using source_table src
on dst.id = src.id
when not matched then
  insert *;

distinct

Fichier:Dbsql.svg
-- overwrite the data table filtering out the duplicates
insert overwrite table1
  select distinct(*) fron table1;

Components

Delta Lake

The data lakehouse storage:

  • ACID transactions
  • Scalable data and metadata handling
  • Audit history and time travel (querying previous versions of the data)
  • Schema enforcement and evolution
  • Streaming and batch data processing
  • use Delta Tables (enhanced version of Apache Parquet files, a columnar storage file format optimized for efficient storage and retrieval of large-scale datasets)

Unity Catalog

The data governance module:

  • data federation: unified view of data from multiple sources
  • handle access permissions to data
  • AI-driven monitoring and reporting

Photon

Query engine used to improve query performance, reduce costs, and optimize resource utilization.

Databricks SQL

Datawarehouse component:

  • text to SQL queries
  • auto-scale for better performances and cost

Workflows

Orchestration:

  • intelligent pipeline triggering (scheduled, file arrival trigger, delta table update)
  • automatic resource allocation
  • automatic checkpoint and recovery (in event of failure, pipeline recovers from the last checkpoint)
  • automatic monitoring and alert (errors, timeouts)

Delta Live Tables

ETL & Real-time Analytics

Databricks AI

Data Science and AI

Menu

Entry Description
Workspace Store code, files and projects.
  • Home: personal space
  • Workspace: shared space, repositories
Catalog Manage and organize data assets: databases, tables, views and functions
Workflow Manage jobs and pipelines
Compute Manage clusters

Code

import os
import shutil
import requests
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException

catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
table_name = "table1"

spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog_name}`")  # create a catalog
spark.sql(f"USE CATALOG `{catalog_name}`")                   # set as the default catalog

spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{schema_name}`")    # create a schema
spark.sql(f"USE SCHEMA `{schema_name}`")                     # set as the default schema

spark.sql(f"CREATE VOLUME IF NOT EXISTS `{volume_name}`")    # create a volume
volume_full_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

target_table_full_path = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
df = spark.table(target_table_full_path)
display(df)

Data Analysis

# variables
catalog_name = "catalog1"
schema_name = "schema1"
bronze_table_name = "bronze" # raw data
silver_table_name = "silver" # processed and cleaned data
gold_table_name = "gold" # aggregated data with business insights

# widgets
dbutils.widgets.text("catalog_name", catalog_name)
dbutils.widgets.text("schema_name", schema_name)
dbutils.widgets.text("bronze_table", bronze_table_name)
dbutils.widgets.text("silver_table", silver_table_name)
dbutils.widgets.text("gold_table", gold_table_name)
Fichier:DBSQL.svg
create catalog if not exists identifier(:catalog_name);
use catalog identifier(:catalog_name);
create schema if not exists identifier(:schema_name);
use schema identifier(:schema_name);

create volume if not exists identifier(:volume_name);

select * from identifier(:bronze_table);

create or replace table identifier(:silver_table) as
select cast(column1 as float), cast(column2 as int)
from identifier(:bronze_table)
where try_cast(column1 as float) is not null
and try_cast(column2 as int) > 100
order by column1;

-- get the column names and types of a table
describe identifier(:silver_table);

Using Widgets for SQL Parameterization

The identifier() function ensures that the widget value is treated as a valid database object name.

# create a widget
dbutils.widgets.text("table_name", "table1")

dbutils.widgets.remove("table_name") # remove a widget
dbutils.widgets.removeAll()
Fichier:DBSQL.svg
select * from identifier(:table_name);

Job

Job Parameters

Fichier:Py.svg
# create a widget that could be set as a job parameter
dbutils.widgets.text("key", "default value")

value = dbutils.widgets.get("key")

Task values

Set a task value in that is accessible from other tasks.

Fichier:Py.svg
dbutils.jobs.taskValues.set('key', 'value')
// get the task value (in the condition field for instance)
{{tasks.TaskName.values.key}}

Job Contexts

Pipeline

  • ETL
  • Unit test / data quality

JSON

Fichier:Py.svg
# read json from file and create a data frame
df = spark.read.json(json_file_path)

# read json from a df column and parse it
from pyspark.sql.functions import col, from_json
schema = "id INT, description STRING"
df = df.select(from_json(col("column1"), schema).alias("json")).select("json.*")
Fichier:Dbsql.svg
create temporary view multiLineJsonTable
using json
options (path = "/Volumes/catalog_name/schema_name/volume_name/file.json", multiline = "true");

select schema_of_json(`${json}`) as schema;

create or replace temp view view1 as
  select json.*
  from (
    select from_json(value, `${schema}`) as json
    from table1);

Panda

Fichier:Py.svg
import pandas as pd
from pandas import json_normalize

# read json from memory and create a data frame
pdf = pd.DataFrame(json_data)
pdf = pd.DataFrame(json_data['nested_property_name'])
pdf = json_normalize(json_data)
pdf = json_normalize(json_data, errors='ignore')
pdf = json_normalize(json_data, 'nested_property_name', errors='ignore')

# convert panda df to spark df
df = spark.createDataFrame(pdf)

CSV

catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
csv_dataset_path = f"{volume_path}/data.csv"

df = spark.read.csv(csv_dataset_path, header=True, inferSchema=True)
display(df)
df.write.saveAsTable("data")
Fichier:Dbsql.svg
select * from csv.`${csv_dataset_path}` limit 10;
select * from csv.`/Volumes/catalog1/schema1/volume1/data.csv` limit 10;

SQL User Defined Function

Fichier:Dbsql.svg
create or replace function fct1(id int, name string)
returns string
return concat("Item ", id, " is named ", name);

select *, fct1(id, name) as message
from table1;

describe function extended fct1;
Persist between execution environments.

Time Travel

Fichier:Dbsql.svg
describe history table1; -- display delta log for each data change

-- query the table1 at a specific version
select *
from table1 version as of 3;

-- rollback
restore table table1 to version as of 3;

API request

Fichier:Py.svg
import requests, datetime, json

today = datetime.date.today().strftime("%Y-%m-%d")
dayplusone = (datetime.date.today() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")

url = f"https://www.domain.net/api?begin={today}&end={dayplusone}"
headers = {"X-API-KEY": "xxx"}
response = requests.get(url, headers=headers)
json_data = response.json()

Read and write files in volume

Fichier:Py.svg
json_file_path = f"{volume_path}/file.json"

# write
dbutils.fs.put(json_file_path, json.dumps(json_data), overwrite=True)

# read into data frame
df = spark.read.json(json_file_path)

SQL Date Time

Fichier:Dbsql.svg
-- string to timestamp_ntz
select to_timestamp_ntz('2025-04-24 00:00+02:00', 'yyyy-MM-dd HH:mmXXX') AS converted_timestamp_ntz;

History

1980 - Data warehouse Collect and store structured data to provide support for refined analysis and reporting.
2000 - Data lake Collect and store raw data and conducting exploratory analysis
2021 - Data lakehouse Unified plateform that benefits of both data lakes and data warehouses solution
Aspect Data Warehouse Data Lake Data Lakehouse
Data Type Structured, processed, and refined data Raw data: structured, semi-structured, and unstructured Combines raw and processed data
Schema Schema-on-write: Data is structured before storage Schema-on-read: Structure applied when accessed Flexible: Schema-on-read for raw data; schema-on-write for structured data
Purpose Optimized for business intelligence (BI), reporting, and predefined analytics Designed for big data analytics, machine learning, and exploratory analysis Unified analytics platform for BI, AI/ML, streaming, and real-time analytics
Processing Approach ETL: Data is cleaned and transformed before storage ELT: Data is loaded first and transformed as needed Both ETL and ELT; enables real-time processing
Scalability Less scalable and more expensive to scale Highly scalable and cost-effective for large volumes of diverse data Combines scalability of lakes with performance optimization of warehouses
Users Business analysts and decision-makers Data scientists, engineers, and analysts BI teams, data scientists, engineers
Accessibility More rigid; changes to structure are complex Flexible; easy to update and adapt Highly adaptable; supports schema evolution
Security & Maturity Mature security measures; better suited for sensitive data Security measures evolving; risk of "data swamp" if not managed properly Strong governance with ACID transactions; improved reliability
Use Cases Operational reporting, dashboards, KPIs Predictive analytics, AI/ML models, real-time analytics Unified platform for BI dashboards, AI/ML workflows, streaming analytics