« Databricks » : différence entre les versions
(90 versions intermédiaires par le même utilisateur non affichées) | |||
Ligne 1 : | Ligne 1 : | ||
= Links = | |||
* [https://docs.databricks.com/aws/en Documentation (AWS)] | |||
* [https://customer-academy.databricks.com Databricks Academy] | |||
* [https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-functions-builtin Built-In SQL Functions] | |||
= Description = | = Description = | ||
Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.<br> | Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.<br> | ||
Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.<br> | Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.<br> | ||
[[File:Databricks.jpg|120px]] | [[File:Databricks.jpg|120px]] | ||
= Data objects = | |||
== Delta Table == | |||
=== Create === | |||
<kode lang='dbsql'> | |||
-- create with schema | |||
create table if not exists table1 ( | |||
id string, | |||
date_string string, | |||
date date generated always as ( | |||
cast(date_string as date)) | |||
comment "generated based on `date_string` column"); | |||
-- create from other data | |||
create or replace table table1 as | |||
select * from parquet.`${parquet-data-folder}`; | |||
drop table if exists table1; | |||
-- deep clone clones the metadata and the data | |||
-- shallow clone clones only the metadata | |||
create or replace cloned_table | |||
deep clone source_table; | |||
</kode> | |||
=== Alter === | |||
<kode lang='dbsql'> | |||
alter table table1 | |||
add constraint constraint1 check (date > '2025-01-30'); | |||
</kode> | |||
=== Describe === | |||
<kode lang='dbsql'> | |||
describe table1; -- display the column names and types | |||
describe extended table1; -- display additional information | |||
describe history table1; -- display delta log for each data change | |||
</kode> | |||
=== [https://docs.databricks.com/aws/en/delta/clustering Liquid clustering] === | |||
<kode lang='dbsql'> | |||
create or replace table1 cluster by (id) as | |||
select * from table0; | |||
alter table table1 | |||
cluster by (id); | |||
optimize table1; -- schedule regular optimize job to cluster data (every hour) | |||
</kode> | |||
== View == | |||
A temporary view is available while the cluster is running but it is not stored in the schema. | |||
<kode lang='dbsql'> | |||
create or replace temp view view1 as | |||
select * from csv.`${csv_path}`; | |||
</kode> | |||
== Functions == | |||
= Data manipulation = | |||
== copy == | |||
<kode lang='dbsql'> | |||
-- idempotent | |||
copy into table1 | |||
from ${parquet-data-folder} | |||
fileformat = parquet | |||
copy_options ('mergeSchema' = 'true'); | |||
copy into table1 | |||
(select *, | |||
cast(column2 as date) column2_date, | |||
current_timestamp() updated, | |||
input_file_name() source_file | |||
from ${parquet-data-folder}) | |||
fileformat = parquet | |||
copy_options ('mergeSchema' = 'true'); | |||
</kode> | |||
== merge == | |||
<kode lang='dbsql'> | |||
merge into destination_table dst | |||
using source_table src | |||
on dst.id = src.id | |||
when matched and dst.column1 is null and src.column1 is not null then | |||
update set column1 = src.column1, column2 = src.column2 | |||
when not matched then | |||
insert (id, column1, column2) | |||
values (dst.id, dst.column1, dst.column2); | |||
-- do not merge duplicates | |||
merge into destination_table dst | |||
using source_table src | |||
on dst.id = src.id | |||
when not matched then | |||
insert *; | |||
</kode> | |||
== distinct == | |||
<kode lang='dbsql'> | |||
-- overwrite the data table filtering out the duplicates | |||
insert overwrite table1 | |||
select distinct(*) fron table1; | |||
</kode> | |||
= Components = | = Components = | ||
Ligne 22 : | Ligne 128 : | ||
== Photon == | == Photon == | ||
Query engine used to improve query performance, reduce costs, and optimize resource utilization. | Query engine used to improve query performance, reduce costs, and optimize resource utilization. | ||
== Databricks SQL == | |||
Datawarehouse component: | |||
* text to SQL queries | |||
* auto-scale for better performances and cost | |||
== Workflows == | |||
Orchestration: | |||
* intelligent pipeline triggering (scheduled, file arrival trigger, delta table update) | |||
* automatic resource allocation | |||
* automatic checkpoint and recovery (in event of failure, pipeline recovers from the last checkpoint) | |||
* automatic monitoring and alert (errors, timeouts) | |||
== Delta Live Tables == | |||
ETL & Real-time Analytics | |||
== Databricks AI == | |||
Data Science and AI | |||
= Menu = | |||
{| class="wikitable wtp" | |||
! Entry | |||
! Description | |||
|- | |||
| Workspace || Store code, files and projects. | |||
* Home: personal space | |||
* Workspace: shared space, repositories | |||
|- | |||
| Catalog || Manage and organize data assets: databases, tables, views and functions | |||
|- | |||
| Workflow || Manage jobs and pipelines | |||
|- | |||
| Compute || Manage clusters | |||
|} | |||
= Code = | |||
<kode lang='python'> | |||
import os | |||
import shutil | |||
import requests | |||
import pandas as pd | |||
from pyspark.sql.types import StructType, StructField, StringType | |||
from pyspark.sql.utils import AnalysisException | |||
catalog_name = "catalog1" | |||
schema_name = "schema1" | |||
volume_name = "volume1" | |||
table_name = "table1" | |||
spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog_name}`") # create a catalog | |||
spark.sql(f"USE CATALOG `{catalog_name}`") # set as the default catalog | |||
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{schema_name}`") # create a schema | |||
spark.sql(f"USE SCHEMA `{schema_name}`") # set as the default schema | |||
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{volume_name}`") # create a volume | |||
volume_full_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}" | |||
target_table_full_path = f"`{catalog_name}`.`{schema_name}`.`{table_name}`" | |||
df = spark.table(target_table_full_path) | |||
display(df) | |||
</kode> | |||
== Data Analysis == | |||
<kode lang='python'> | |||
# variables | |||
catalog_name = "catalog1" | |||
schema_name = "schema1" | |||
bronze_table_name = "bronze" # raw data | |||
silver_table_name = "silver" # processed and cleaned data | |||
gold_table_name = "gold" # aggregated data with business insights | |||
# widgets | |||
dbutils.widgets.text("catalog_name", catalog_name) | |||
dbutils.widgets.text("schema_name", schema_name) | |||
dbutils.widgets.text("bronze_table", bronze_table_name) | |||
dbutils.widgets.text("silver_table", silver_table_name) | |||
dbutils.widgets.text("gold_table", gold_table_name) | |||
</kode> | |||
<kode lang='DBSQL'> | |||
create catalog if not exists identifier(:catalog_name); | |||
use catalog identifier(:catalog_name); | |||
create schema if not exists identifier(:schema_name); | |||
use schema identifier(:schema_name); | |||
create volume if not exists identifier(:volume_name); | |||
select * from identifier(:bronze_table); | |||
create or replace table identifier(:silver_table) as | |||
select cast(column1 as float), cast(column2 as int) | |||
from identifier(:bronze_table) | |||
where try_cast(column1 as float) is not null | |||
and try_cast(column2 as int) > 100 | |||
order by column1; | |||
-- get the column names and types of a table | |||
describe identifier(:silver_table); | |||
</kode> | |||
= [https://docs.databricks.com/aws/en/notebooks/widgets Using Widgets for SQL Parameterization] = | |||
The {{boxx|identifier()}} function ensures that the widget value is treated as a valid database object name. | |||
<kode lang='python'> | |||
# create a widget | |||
dbutils.widgets.text("table_name", "table1") | |||
dbutils.widgets.remove("table_name") # remove a widget | |||
dbutils.widgets.removeAll() | |||
</kode> | |||
<kode lang='DBSQL'> | |||
select * from identifier(:table_name); | |||
</kode> | |||
= Job = | |||
== Job Parameters == | |||
<kode lang='py'> | |||
# create a widget that could be set as a job parameter | |||
dbutils.widgets.text("key", "default value") | |||
value = dbutils.widgets.get("key") | |||
</kode> | |||
== Task values == | |||
Set a task value in that is accessible from other tasks. | |||
<kode lang='py'> | |||
dbutils.jobs.taskValues.set('key', 'value') | |||
</kode> | |||
<kode lang='json'> | |||
// get the task value (in the condition field for instance) | |||
{{tasks.TaskName.values.key}} | |||
</kode> | |||
== Job Contexts == | |||
= Pipeline = | |||
* ETL | |||
* Unit test / data quality | |||
= JSON = | |||
<kode lang='py'> | |||
# read json from file and create a data frame | |||
df = spark.read.json(json_file_path) | |||
# read json from a df column and parse it | |||
from pyspark.sql.functions import col, from_json | |||
schema = "id INT, description STRING" | |||
df = df.select(from_json(col("column1"), schema).alias("json")).select("json.*") | |||
</kode> | |||
<kode lang='dbsql'> | |||
create temporary view multiLineJsonTable | |||
using json | |||
options (path = "/Volumes/catalog_name/schema_name/volume_name/file.json", multiline = "true"); | |||
select schema_of_json(`${json}`) as schema; | |||
create or replace temp view view1 as | |||
select json.* | |||
from ( | |||
select from_json(value, `${schema}`) as json | |||
from table1); | |||
</kode> | |||
== Panda == | |||
<kode lang='py'> | |||
import pandas as pd | |||
from pandas import json_normalize | |||
# read json from memory and create a data frame | |||
pdf = pd.DataFrame(json_data) | |||
pdf = pd.DataFrame(json_data['nested_property_name']) | |||
pdf = json_normalize(json_data) | |||
pdf = json_normalize(json_data, errors='ignore') | |||
pdf = json_normalize(json_data, 'nested_property_name', errors='ignore') | |||
# convert panda df to spark df | |||
df = spark.createDataFrame(pdf) | |||
</kode> | |||
= CSV = | |||
<kode lang='python'> | |||
catalog_name = "catalog1" | |||
schema_name = "schema1" | |||
volume_name = "volume1" | |||
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}" | |||
csv_dataset_path = f"{volume_path}/data.csv" | |||
df = spark.read.csv(csv_dataset_path, header=True, inferSchema=True) | |||
display(df) | |||
df.write.saveAsTable("data") | |||
</kode> | |||
<kode lang='dbsql'> | |||
select * from csv.`${csv_dataset_path}` limit 10; | |||
select * from csv.`/Volumes/catalog1/schema1/volume1/data.csv` limit 10; | |||
</kode> | |||
= SQL User Defined Function = | |||
<kode lang='dbsql'> | |||
create or replace function fct1(id int, name string) | |||
returns string | |||
return concat("Item ", id, " is named ", name); | |||
select *, fct1(id, name) as message | |||
from table1; | |||
describe function extended fct1; | |||
</kode> | |||
{{info | Persist between execution environments.}} | |||
= Time Travel = | |||
<kode lang='dbsql'> | |||
describe history table1; -- display delta log for each data change | |||
-- query the table1 at a specific version | |||
select * | |||
from table1 version as of 3; | |||
-- rollback | |||
restore table table1 to version as of 3; | |||
</kode> | |||
= API request = | |||
<kode lang='py'> | |||
import requests, datetime, json | |||
today = datetime.date.today().strftime("%Y-%m-%d") | |||
dayplusone = (datetime.date.today() + datetime.timedelta(days=1)).strftime("%Y-%m-%d") | |||
url = f"https://www.domain.net/api?begin={today}&end={dayplusone}" | |||
headers = {"X-API-KEY": "xxx"} | |||
response = requests.get(url, headers=headers) | |||
json_data = response.json() | |||
</kode> | |||
= Read and write files in volume = | |||
<kode lang='py'> | |||
json_file_path = f"{volume_path}/file.json" | |||
# write | |||
dbutils.fs.put(json_file_path, json.dumps(json_data), overwrite=True) | |||
# read into data frame | |||
df = spark.read.json(json_file_path) | |||
</kode> | |||
= SQL Date Time = | |||
<kode lang='dbsql'> | |||
-- string to timestamp_ntz | |||
select to_timestamp_ntz('2025-04-24 00:00+02:00', 'yyyy-MM-dd HH:mmXXX') AS converted_timestamp_ntz; | |||
</kode> | |||
= History = | = History = | ||
{| class="wikitable wtp" | {| class="wikitable wtp" | ||
|- | |- | ||
| 1980 - Data warehouse || Collect and store structured data to provide support | | 1980 - Data warehouse || Collect and store structured data to provide support for refined analysis and reporting. | ||
|- | |- | ||
| 2000 - Data lake || Collect and store raw data and conducting exploratory analysis | | 2000 - Data lake || Collect and store raw data and conducting exploratory analysis |
Dernière version du 28 avril 2025 à 14:12
Links
Description
Databricks combines a Data Lakehouse with Generative IA into a Data Intelligence Plateform.
Generative IA allows the usage of natural language to fetch data and allows to optimize storage and costs based on previous usages.
Erreur lors de la création de la vignette : /bin/bash: /usr/bin/convert: No such file or directory Error code: 127
Data objects
Delta Table
Create
Fichier:Dbsql.svg | -- create with schema create table if not exists table1 ( id string, date_string string, date date generated always as ( cast(date_string as date)) comment "generated based on `date_string` column"); -- create from other data create or replace table table1 as select * from parquet.`${parquet-data-folder}`; drop table if exists table1; -- deep clone clones the metadata and the data -- shallow clone clones only the metadata create or replace cloned_table deep clone source_table; |
Alter
Fichier:Dbsql.svg | alter table table1 add constraint constraint1 check (date > '2025-01-30'); |
Describe
Fichier:Dbsql.svg | describe table1; -- display the column names and types describe extended table1; -- display additional information describe history table1; -- display delta log for each data change |
Liquid clustering
Fichier:Dbsql.svg | create or replace table1 cluster by (id) as select * from table0; alter table table1 cluster by (id); optimize table1; -- schedule regular optimize job to cluster data (every hour) |
View
A temporary view is available while the cluster is running but it is not stored in the schema.
Fichier:Dbsql.svg | create or replace temp view view1 as select * from csv.`${csv_path}`; |
Functions
Data manipulation
copy
Fichier:Dbsql.svg | -- idempotent copy into table1 from ${parquet-data-folder} fileformat = parquet copy_options ('mergeSchema' = 'true'); copy into table1 (select *, cast(column2 as date) column2_date, current_timestamp() updated, input_file_name() source_file from ${parquet-data-folder}) fileformat = parquet copy_options ('mergeSchema' = 'true'); |
merge
Fichier:Dbsql.svg | merge into destination_table dst using source_table src on dst.id = src.id when matched and dst.column1 is null and src.column1 is not null then update set column1 = src.column1, column2 = src.column2 when not matched then insert (id, column1, column2) values (dst.id, dst.column1, dst.column2); -- do not merge duplicates merge into destination_table dst using source_table src on dst.id = src.id when not matched then insert *; |
distinct
Fichier:Dbsql.svg | -- overwrite the data table filtering out the duplicates insert overwrite table1 select distinct(*) fron table1; |
Components
Delta Lake
The data lakehouse storage:
- ACID transactions
- Scalable data and metadata handling
- Audit history and time travel (querying previous versions of the data)
- Schema enforcement and evolution
- Streaming and batch data processing
- use Delta Tables (enhanced version of Apache Parquet files, a columnar storage file format optimized for efficient storage and retrieval of large-scale datasets)
Unity Catalog
The data governance module:
- data federation: unified view of data from multiple sources
- handle access permissions to data
- AI-driven monitoring and reporting
Photon
Query engine used to improve query performance, reduce costs, and optimize resource utilization.
Databricks SQL
Datawarehouse component:
- text to SQL queries
- auto-scale for better performances and cost
Workflows
Orchestration:
- intelligent pipeline triggering (scheduled, file arrival trigger, delta table update)
- automatic resource allocation
- automatic checkpoint and recovery (in event of failure, pipeline recovers from the last checkpoint)
- automatic monitoring and alert (errors, timeouts)
Delta Live Tables
ETL & Real-time Analytics
Databricks AI
Data Science and AI
Menu
Entry | Description |
---|---|
Workspace | Store code, files and projects.
|
Catalog | Manage and organize data assets: databases, tables, views and functions |
Workflow | Manage jobs and pipelines |
Compute | Manage clusters |
Code
import os
import shutil
import requests
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
table_name = "table1"
spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog_name}`") # create a catalog
spark.sql(f"USE CATALOG `{catalog_name}`") # set as the default catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{schema_name}`") # create a schema
spark.sql(f"USE SCHEMA `{schema_name}`") # set as the default schema
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{volume_name}`") # create a volume
volume_full_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
target_table_full_path = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
df = spark.table(target_table_full_path)
display(df)
|
Data Analysis
# variables
catalog_name = "catalog1"
schema_name = "schema1"
bronze_table_name = "bronze" # raw data
silver_table_name = "silver" # processed and cleaned data
gold_table_name = "gold" # aggregated data with business insights
# widgets
dbutils.widgets.text("catalog_name", catalog_name)
dbutils.widgets.text("schema_name", schema_name)
dbutils.widgets.text("bronze_table", bronze_table_name)
dbutils.widgets.text("silver_table", silver_table_name)
dbutils.widgets.text("gold_table", gold_table_name)
|
Fichier:DBSQL.svg | create catalog if not exists identifier(:catalog_name); use catalog identifier(:catalog_name); create schema if not exists identifier(:schema_name); use schema identifier(:schema_name); create volume if not exists identifier(:volume_name); select * from identifier(:bronze_table); create or replace table identifier(:silver_table) as select cast(column1 as float), cast(column2 as int) from identifier(:bronze_table) where try_cast(column1 as float) is not null and try_cast(column2 as int) > 100 order by column1; -- get the column names and types of a table describe identifier(:silver_table); |
Using Widgets for SQL Parameterization
The identifier() function ensures that the widget value is treated as a valid database object name.
# create a widget
dbutils.widgets.text("table_name", "table1")
dbutils.widgets.remove("table_name") # remove a widget
dbutils.widgets.removeAll()
|
Fichier:DBSQL.svg | select * from identifier(:table_name); |
Job
Job Parameters
Fichier:Py.svg | # create a widget that could be set as a job parameter
dbutils.widgets.text("key", "default value")
value = dbutils.widgets.get("key")
|
Task values
Set a task value in that is accessible from other tasks.
Fichier:Py.svg | dbutils.jobs.taskValues.set('key', 'value')
|
// get the task value (in the condition field for instance)
{{tasks.TaskName.values.key}}
|
Job Contexts
Pipeline
- ETL
- Unit test / data quality
JSON
Fichier:Py.svg | # read json from file and create a data frame
df = spark.read.json(json_file_path)
# read json from a df column and parse it
from pyspark.sql.functions import col, from_json
schema = "id INT, description STRING"
df = df.select(from_json(col("column1"), schema).alias("json")).select("json.*")
|
Fichier:Dbsql.svg | create temporary view multiLineJsonTable using json options (path = "/Volumes/catalog_name/schema_name/volume_name/file.json", multiline = "true"); select schema_of_json(`${json}`) as schema; create or replace temp view view1 as select json.* from ( select from_json(value, `${schema}`) as json from table1); |
Panda
Fichier:Py.svg | import pandas as pd
from pandas import json_normalize
# read json from memory and create a data frame
pdf = pd.DataFrame(json_data)
pdf = pd.DataFrame(json_data['nested_property_name'])
pdf = json_normalize(json_data)
pdf = json_normalize(json_data, errors='ignore')
pdf = json_normalize(json_data, 'nested_property_name', errors='ignore')
# convert panda df to spark df
df = spark.createDataFrame(pdf)
|
CSV
catalog_name = "catalog1"
schema_name = "schema1"
volume_name = "volume1"
volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
csv_dataset_path = f"{volume_path}/data.csv"
df = spark.read.csv(csv_dataset_path, header=True, inferSchema=True)
display(df)
df.write.saveAsTable("data")
|
Fichier:Dbsql.svg | select * from csv.`${csv_dataset_path}` limit 10; select * from csv.`/Volumes/catalog1/schema1/volume1/data.csv` limit 10; |
SQL User Defined Function
Fichier:Dbsql.svg | create or replace function fct1(id int, name string) returns string return concat("Item ", id, " is named ", name); select *, fct1(id, name) as message from table1; describe function extended fct1; |
![]() |
Persist between execution environments. |
Time Travel
Fichier:Dbsql.svg | describe history table1; -- display delta log for each data change -- query the table1 at a specific version select * from table1 version as of 3; -- rollback restore table table1 to version as of 3; |
API request
Fichier:Py.svg | import requests, datetime, json
today = datetime.date.today().strftime("%Y-%m-%d")
dayplusone = (datetime.date.today() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
url = f"https://www.domain.net/api?begin={today}&end={dayplusone}"
headers = {"X-API-KEY": "xxx"}
response = requests.get(url, headers=headers)
json_data = response.json()
|
Read and write files in volume
Fichier:Py.svg | json_file_path = f"{volume_path}/file.json"
# write
dbutils.fs.put(json_file_path, json.dumps(json_data), overwrite=True)
# read into data frame
df = spark.read.json(json_file_path)
|
SQL Date Time
Fichier:Dbsql.svg | -- string to timestamp_ntz select to_timestamp_ntz('2025-04-24 00:00+02:00', 'yyyy-MM-dd HH:mmXXX') AS converted_timestamp_ntz; |
History
1980 - Data warehouse | Collect and store structured data to provide support for refined analysis and reporting. |
2000 - Data lake | Collect and store raw data and conducting exploratory analysis |
2021 - Data lakehouse | Unified plateform that benefits of both data lakes and data warehouses solution |
Aspect | Data Warehouse | Data Lake | Data Lakehouse |
---|---|---|---|
Data Type | Structured, processed, and refined data | Raw data: structured, semi-structured, and unstructured | Combines raw and processed data |
Schema | Schema-on-write: Data is structured before storage | Schema-on-read: Structure applied when accessed | Flexible: Schema-on-read for raw data; schema-on-write for structured data |
Purpose | Optimized for business intelligence (BI), reporting, and predefined analytics | Designed for big data analytics, machine learning, and exploratory analysis | Unified analytics platform for BI, AI/ML, streaming, and real-time analytics |
Processing Approach | ETL: Data is cleaned and transformed before storage | ELT: Data is loaded first and transformed as needed | Both ETL and ELT; enables real-time processing |
Scalability | Less scalable and more expensive to scale | Highly scalable and cost-effective for large volumes of diverse data | Combines scalability of lakes with performance optimization of warehouses |
Users | Business analysts and decision-makers | Data scientists, engineers, and analysts | BI teams, data scientists, engineers |
Accessibility | More rigid; changes to structure are complex | Flexible; easy to update and adapt | Highly adaptable; supports schema evolution |
Security & Maturity | Mature security measures; better suited for sensitive data | Security measures evolving; risk of "data swamp" if not managed properly | Strong governance with ACID transactions; improved reliability |
Use Cases | Operational reporting, dashboards, KPIs | Predictive analytics, AI/ML models, real-time analytics | Unified platform for BI dashboards, AI/ML workflows, streaming analytics |