PySpark: tutorial completo sobre como usar filtros e joins

Aprenda a filtrar e unir DataFrames em PySpark com exemplos práticos e dicas de desempenho

No universo do big data, aprender a filtrar e unir DataFrames em PySpark é uma habilidade essencial para qualquer profissional da área. Neste tutorial completo, você descobrirá como utilizar operações de filtro e join de forma prática, maximizando a eficiência das suas análises e facilitando a extração de insights valiosos de grandes volumes de dados.

Você verá exemplos práticos e dicas de desempenho que ajudarão a otimizar seu código, além de entender como configurar seu ambiente para aproveitar ao máximo essa poderosa ferramenta do Apache Spark. Prepare-se para transformar suas tarefas diárias em um processo fluido e eficaz!

O que é PySpark?

PySpark é uma interface de programação em Python para o Apache Spark, um sistema de computação em cluster voltado para o processamento e análise de grandes volumes de dados. Utilizando PySpark, os desenvolvedores podem aproveitar a rica API do Spark para realizar diversas operações, desde manipulação de dados até a criação de modelos de machine learning.

Um dos principais benefícios do PySpark é que ele permite a execução de operações complexas utilizando a linguagem Python, facilitando o trabalho para quem já está familiarizado com essa linguagem. Com ele, é possível realizar consultas SQL, transferências de dados e processamento em memória, tornando o trabalho com dados muito mais eficiente.

Outra característica importante do PySpark é a capacidade de trabalhar com DataFrames, que são estruturas de dados semelhantes a tabelas em um banco de dados. Através dos DataFrames, você pode realizar análises complexas e relatórios de forma intuitiva e poderosa.

Usando os recursos certos, é possível filtrar e unir DataFrames de forma muito eficiente usando PySpark

Como instalar o PySpark?

A instalação do PySpark pode ser feita de várias maneiras. Aqui estão algumas opções comuns:

  1. Usando PyPI (pip):

    • Para a instalação padrão, utilize o comando:

    • Para instalar dependências adicionais, use:

      • Spark SQL:

      • API do pandas no Spark (inclui o Plotly para visualização):

      • Spark Connect:

    • Para uma versão específica do Hadoop, configure a variável de ambiente PYSPARK_HADOOP_VERSION:

  2. Usando Conda:

    • Crie e ative um novo ambiente Conda:

    • Instale o PySpark:

  3. Download Manual:

    • Baixe a distribuição do PySpark diretamente do site da Apache Spark e descompacte o arquivo:

    • Certifique-se de que a variável de ambiente SPARK_HOME aponte para o diretório descompactado e atualize a variável PYTHONPATH.

  4. Instalando a partir do código-fonte:

    • Para instruções sobre como compilar o PySpark a partir do código-fonte, consulte a seção "Building Spark" na documentação oficial.

Dependências: É necessário ter o Java 8, 11 ou 17 instalado, com a variável de ambiente JAVA_HOME configurada corretamente. Além disso, pacotes como py4j, pandas, pyarrow, e numpy podem ser exigidos, dependendo do uso do PySpark.

Como criar DataFrames em PySpark?

Para criar DataFrames em PySpark, você pode utilizar diferentes métodos, dependendo da fonte de dados:

1. Criar DataFrame a partir de um RDD

Um RDD pode ser criado usando o método parallelize() e, em seguida, transformá-lo em um DataFrame.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ExemploPySpark').getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)

# Usando toDF()
df_from_rdd = rdd.toDF(["language", "users_count"])

2. Criar DataFrame a partir de uma lista

Você também pode criar um DataFrame diretamente de uma lista.

data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
df_from_list = spark.createDataFrame(data).toDF(*["language", "users_count"])

3. Criar DataFrame com um schema definido

Defina o esquema usando StructType para especificar o nome das colunas e seus tipos.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data2 = [("James", "Smith", "36636", "M", 3000), ("Michael", "Rose", "40288", "M", 4000)]
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
])
df_with_schema = spark.createDataFrame(data=data2, schema=schema)

4. Criar DataFrame a partir de fontes de dados

Crie DataFrames a partir de arquivos CSV, TXT, JSON, entre outros formatos suportados.

df_from_csv = spark.read.csv("/caminho/para/arquivo.csv")
df_from_text = spark.read.text("/caminho/para/arquivo.txt")
df_from_json = spark.read.json("/caminho/para/arquivo.json")

Como usar filtros em DataFrames?

Para usar filtros em DataFrames em PySpark, você pode utilizar as funções filter() e where(), que operam de forma intercambiável. A seguir, alguns exemplos de como aplicar filtros em DataFrames:

Filtros Simples

  1. Filtro simples:

  2. Filtro negativo:

  3. Usando a função col():

  4. Expressões SQL:

Múltiplas Condições

  1. Aplicando múltiplas condições:

  2. Filtrando com listas:

Filtrando Estruturas Aninhadas

  • Filtrando arrays:

  • Utilizando expressões regulares:

Considerações

A filtragem é executada de forma lazy, ou seja, as operações são realizadas somente quando uma ação é chamada no DataFrame, permitindo otimizações no desempenho.

Como realizar joins entre DataFrames?

Para realizar joins entre DataFrames em PySpark, utilize o método join() para combinar dois ou mais DataFrames com base em colunas correspondentes. A sintaxe básica é:

dataframe1.join(dataframe2, dataframe1.column_name == dataframe2.column_name, "tipo")

Criando DataFrames de Exemplo

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('sparkdf').getOrCreate()

# Primeiro DataFrame
data1 = [["1", "sravan", "company 1"], ["2", "ojaswi", "company 1"]]
columns1 = ['ID', 'NAME', 'Company']
dataframe1 = spark.createDataFrame(data1, columns1)

# Segundo DataFrame
data2 = [["1", "45000", "IT"], ["2", "145000", "Manager"], ["3", "34000", "Sales"]]
columns2 = ['ID', 'salary', 'department']
dataframe2 = spark.createDataFrame(data2, columns2)

Tipos de Joins

  1. Inner Join:

  2. Full Outer Join:

  3. Left Join:

  4. Right Join:

  5. Left Semi Join:

  6. Left Anti Join:

Usando SQL

Para realizar joins usando SQL, crie uma visualização temporária:

dataframe1.createOrReplaceTempView("df1")
dataframe2.createOrReplaceTempView("df2")

spark.sql("SELECT * FROM df1 INNER JOIN df2 ON df1.ID = df2.ID").show()

Dicas de desempenho ao usar filtros e joins

Ao utilizar filtros e joins em PySpark, aqui estão algumas práticas que podem otimizar o desempenho:

  1. Otimização do código: Evite operações desnecessárias e minimize a movimentação de dados entre os nós do cluster.

  2. Cache de DataFrames: Utilize o caching para manter os dados em memória e evitar recalculações.

  3. Particionamento adequado: Um particionamento eficaz ajuda a equilibrar a distribuição dos dados.

  4. Filtragem antecipada: Aplique filtros o mais cedo possível para reduzir o volume de dados.

  5. Escolha do tipo de join: Selecione o tipo de join correto para suas necessidades.

  6. Agrupamento após filtragem: Execute agrupamentos após filtros para limitar os dados a serem processados.

  7. Joins de broadcast: Utilize joins de broadcast para tabelas menores para otimizar o tempo de processamento.

  8. Monitoramento de desempenho: Use ferramentas como o Spark UI para observar o desempenho e ajustar conforme necessário.

  9. Evitar shuffling: Prefira .join() em vez de .cogroup() e utilize colunas já particionadas.

  10. Limpeza pós-join: Remova colunas desnecessárias com .select() para diminuir o volume de dados.

Seguir essas orientações ajudará a maximizar a eficiência das operações de joins e filtros em seus projetos de PySpark.

Conclusão

Resumindo, neste tutorial exploramos como utilizar filtros e joins em PySpark para manipulação eficaz de DataFrames. Você aprendeu a aplicar filtros simples e complexos, bem como diferentes formas de realizar joins, desde o inner até o left anti join, otimizando suas operações com dicas de desempenho para maximizar a eficiência no processamento de grandes volumes de dados.

Além disso, as práticas recomendadas que discutimos podem ajudar a evitar gargalos de desempenho e garantir que suas análises sejam não apenas precisas, mas também rápidas. Com essas ferramentas e conhecimentos, você está pronto para extrair insights valiosos e transformar dados brutos em informações significativas.