- Data Hackers Newsletter
- Posts
- PySpark: tutorial completo sobre como usar filtros e joins
PySpark: tutorial completo sobre como usar filtros e joins
Aprenda a filtrar e unir DataFrames em PySpark com exemplos práticos e dicas de desempenho
No universo do big data, aprender a filtrar e unir DataFrames em PySpark é uma habilidade essencial para qualquer profissional da área. Neste tutorial completo, você descobrirá como utilizar operações de filtro e join de forma prática, maximizando a eficiência das suas análises e facilitando a extração de insights valiosos de grandes volumes de dados.
Você verá exemplos práticos e dicas de desempenho que ajudarão a otimizar seu código, além de entender como configurar seu ambiente para aproveitar ao máximo essa poderosa ferramenta do Apache Spark. Prepare-se para transformar suas tarefas diárias em um processo fluido e eficaz!
O que é PySpark?
PySpark é uma interface de programação em Python para o Apache Spark, um sistema de computação em cluster voltado para o processamento e análise de grandes volumes de dados. Utilizando PySpark, os desenvolvedores podem aproveitar a rica API do Spark para realizar diversas operações, desde manipulação de dados até a criação de modelos de machine learning.
Um dos principais benefícios do PySpark é que ele permite a execução de operações complexas utilizando a linguagem Python, facilitando o trabalho para quem já está familiarizado com essa linguagem. Com ele, é possível realizar consultas SQL, transferências de dados e processamento em memória, tornando o trabalho com dados muito mais eficiente.
Outra característica importante do PySpark é a capacidade de trabalhar com DataFrames, que são estruturas de dados semelhantes a tabelas em um banco de dados. Através dos DataFrames, você pode realizar análises complexas e relatórios de forma intuitiva e poderosa.

Usando os recursos certos, é possível filtrar e unir DataFrames de forma muito eficiente usando PySpark
Como instalar o PySpark?
A instalação do PySpark pode ser feita de várias maneiras. Aqui estão algumas opções comuns:
Usando PyPI (pip):
Para a instalação padrão, utilize o comando:
Para instalar dependências adicionais, use:
Spark SQL:
API do pandas no Spark (inclui o Plotly para visualização):
Spark Connect:
Para uma versão específica do Hadoop, configure a variável de ambiente
PYSPARK_HADOOP_VERSION:
Usando Conda:
Crie e ative um novo ambiente Conda:
Instale o PySpark:
Download Manual:
Baixe a distribuição do PySpark diretamente do site da Apache Spark e descompacte o arquivo:
Certifique-se de que a variável de ambiente
SPARK_HOMEaponte para o diretório descompactado e atualize a variávelPYTHONPATH.
Instalando a partir do código-fonte:
Para instruções sobre como compilar o PySpark a partir do código-fonte, consulte a seção "Building Spark" na documentação oficial.
Dependências: É necessário ter o Java 8, 11 ou 17 instalado, com a variável de ambiente JAVA_HOME configurada corretamente. Além disso, pacotes como py4j, pandas, pyarrow, e numpy podem ser exigidos, dependendo do uso do PySpark.
Como criar DataFrames em PySpark?
Para criar DataFrames em PySpark, você pode utilizar diferentes métodos, dependendo da fonte de dados:
1. Criar DataFrame a partir de um RDD
Um RDD pode ser criado usando o método parallelize() e, em seguida, transformá-lo em um DataFrame.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ExemploPySpark').getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)
# Usando toDF()
df_from_rdd = rdd.toDF(["language", "users_count"])
2. Criar DataFrame a partir de uma lista
Você também pode criar um DataFrame diretamente de uma lista.
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
df_from_list = spark.createDataFrame(data).toDF(*["language", "users_count"])
3. Criar DataFrame com um schema definido
Defina o esquema usando StructType para especificar o nome das colunas e seus tipos.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data2 = [("James", "Smith", "36636", "M", 3000), ("Michael", "Rose", "40288", "M", 4000)]
schema = StructType([
StructField("firstname", StringType(), True),
StructField("lastname", StringType(), True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True)
])
df_with_schema = spark.createDataFrame(data=data2, schema=schema)
4. Criar DataFrame a partir de fontes de dados
Crie DataFrames a partir de arquivos CSV, TXT, JSON, entre outros formatos suportados.
df_from_csv = spark.read.csv("/caminho/para/arquivo.csv")
df_from_text = spark.read.text("/caminho/para/arquivo.txt")
df_from_json = spark.read.json("/caminho/para/arquivo.json")
Como usar filtros em DataFrames?
Para usar filtros em DataFrames em PySpark, você pode utilizar as funções filter() e where(), que operam de forma intercambiável. A seguir, alguns exemplos de como aplicar filtros em DataFrames:
Filtros Simples
Filtro simples:
Filtro negativo:
Usando a função
col():Expressões SQL:
Múltiplas Condições
Aplicando múltiplas condições:
Filtrando com listas:
Filtrando Estruturas Aninhadas
Filtrando arrays:
Utilizando expressões regulares:
Considerações
A filtragem é executada de forma lazy, ou seja, as operações são realizadas somente quando uma ação é chamada no DataFrame, permitindo otimizações no desempenho.
Como realizar joins entre DataFrames?
Para realizar joins entre DataFrames em PySpark, utilize o método join() para combinar dois ou mais DataFrames com base em colunas correspondentes. A sintaxe básica é:
dataframe1.join(dataframe2, dataframe1.column_name == dataframe2.column_name, "tipo")
Criando DataFrames de Exemplo
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# Primeiro DataFrame
data1 = [["1", "sravan", "company 1"], ["2", "ojaswi", "company 1"]]
columns1 = ['ID', 'NAME', 'Company']
dataframe1 = spark.createDataFrame(data1, columns1)
# Segundo DataFrame
data2 = [["1", "45000", "IT"], ["2", "145000", "Manager"], ["3", "34000", "Sales"]]
columns2 = ['ID', 'salary', 'department']
dataframe2 = spark.createDataFrame(data2, columns2)
Tipos de Joins
Inner Join:
Full Outer Join:
Left Join:
Right Join:
Left Semi Join:
Left Anti Join:
Usando SQL
Para realizar joins usando SQL, crie uma visualização temporária:
dataframe1.createOrReplaceTempView("df1")
dataframe2.createOrReplaceTempView("df2")
spark.sql("SELECT * FROM df1 INNER JOIN df2 ON df1.ID = df2.ID").show()
Dicas de desempenho ao usar filtros e joins
Ao utilizar filtros e joins em PySpark, aqui estão algumas práticas que podem otimizar o desempenho:
Otimização do código: Evite operações desnecessárias e minimize a movimentação de dados entre os nós do cluster.
Cache de DataFrames: Utilize o caching para manter os dados em memória e evitar recalculações.
Particionamento adequado: Um particionamento eficaz ajuda a equilibrar a distribuição dos dados.
Filtragem antecipada: Aplique filtros o mais cedo possível para reduzir o volume de dados.
Escolha do tipo de join: Selecione o tipo de join correto para suas necessidades.
Agrupamento após filtragem: Execute agrupamentos após filtros para limitar os dados a serem processados.
Joins de broadcast: Utilize joins de broadcast para tabelas menores para otimizar o tempo de processamento.
Monitoramento de desempenho: Use ferramentas como o Spark UI para observar o desempenho e ajustar conforme necessário.
Evitar shuffling: Prefira
.join()em vez de.cogroup()e utilize colunas já particionadas.Limpeza pós-join: Remova colunas desnecessárias com
.select()para diminuir o volume de dados.
Seguir essas orientações ajudará a maximizar a eficiência das operações de joins e filtros em seus projetos de PySpark.
Conclusão
Resumindo, neste tutorial exploramos como utilizar filtros e joins em PySpark para manipulação eficaz de DataFrames. Você aprendeu a aplicar filtros simples e complexos, bem como diferentes formas de realizar joins, desde o inner até o left anti join, otimizando suas operações com dicas de desempenho para maximizar a eficiência no processamento de grandes volumes de dados.
Além disso, as práticas recomendadas que discutimos podem ajudar a evitar gargalos de desempenho e garantir que suas análises sejam não apenas precisas, mas também rápidas. Com essas ferramentas e conhecimentos, você está pronto para extrair insights valiosos e transformar dados brutos em informações significativas.