Saltar a contenido

Google Cloud Dataflow: Streaming y Batch Processing

"Dataflow es el servicio completamente administrado de Google para ejecutar pipelines de Apache Beam a escala."


🌊 ¿Qué es Cloud Dataflow?

Cloud Dataflow es una plataforma de procesamiento de datos unificada que permite crear pipelines tanto para procesamiento en lote (batch) como en tiempo real (streaming) utilizando Apache Beam.


🏗️ Arquitectura y Conceptos

Unified Programming Model

# Pipeline que funciona tanto para batch como streaming
import apache_beam as beam
from apache_beam.transforms import window

def create_pipeline(pipeline_options):
    with beam.Pipeline(options=pipeline_options) as pipeline:
        events = (pipeline
                 | 'Read Events' >> beam.io.ReadFromPubSub(topic=input_topic)
                 | 'Parse JSON' >> beam.Map(json.loads)
                 | 'Extract User Events' >> beam.Map(extract_user_data)
                 | 'Window into Fixed Windows' >> beam.WindowInto(
                     window.FixedWindows(60))  # 1-minute windows
                 | 'Count by User' >> beam.CombinePerKey(sum)
                 | 'Format Output' >> beam.Map(format_result)
                 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                     table=output_table,
                     schema=output_schema))

Componentes Clave

  • Apache Beam SDK: Modelo de programación unificado
  • Dataflow Runner: Motor de ejecución en Google Cloud
  • Auto-scaling: Escalamiento automático de workers
  • Shuffle Service: Servicio administrado para operaciones shuffle

🚀 Casos de Uso Streaming

1. Real-time Analytics Dashboard

import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.options.pipeline_options import PipelineOptions

class CalculateMetrics(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        timestamp = window.end.to_utc_datetime()

        yield {
            'timestamp': timestamp.isoformat(),
            'metric_name': 'page_views',
            'value': element[1],
            'dimensions': {
                'page': element[0]['page'],
                'country': element[0]['country']
            }
        }

def run_streaming_pipeline():
    options = PipelineOptions([
        '--project=my-gcp-project',
        '--runner=DataflowRunner',
        '--streaming=true',
        '--region=us-central1'
    ])

    with beam.Pipeline(options=options) as pipeline:
        (pipeline
         | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
             subscription='projects/my-project/subscriptions/web-events')
         | 'Parse JSON' >> beam.Map(json.loads)
         | 'Add Timestamp' >> beam.Map(
             lambda x: beam.window.TimestampedValue(x, x['timestamp']))
         | 'Window into 1min' >> beam.WindowInto(
             window.FixedWindows(60),
             allowed_lateness=30)
         | 'Group by Page+Country' >> beam.GroupBy(
             lambda x: (x['page'], x['country']))
         | 'Count Events' >> beam.CombineGlobally(
             beam.combiners.CountCombineFn()).without_defaults()
         | 'Calculate Metrics' >> beam.ParDo(CalculateMetrics())
         | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             table='analytics.real_time_metrics',
             schema={
                 'fields': [
                     {'name': 'timestamp', 'type': 'TIMESTAMP'},
                     {'name': 'metric_name', 'type': 'STRING'},
                     {'name': 'value', 'type': 'INTEGER'},
                     {'name': 'dimensions', 'type': 'RECORD', 'mode': 'REPEATED'}
                 ]
             },
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

2. Stream Processing con Side Inputs

class EnrichWithUserData(beam.DoFn):
    def process(self, element, user_data):
        user_id = element['user_id']
        user_info = user_data.get(user_id, {})

        enriched = {
            **element,
            'user_segment': user_info.get('segment', 'unknown'),
            'user_country': user_info.get('country', 'unknown'),
            'user_registration_date': user_info.get('registration_date')
        }

        yield enriched

def create_streaming_pipeline_with_side_inputs():
    # Main pipeline
    main_stream = (pipeline
                  | 'Read Main Stream' >> beam.io.ReadFromPubSub(
                      topic='projects/my-project/topics/user-events'))

    # Side input - user data from BigQuery
    user_data = (pipeline
                | 'Read User Data' >> beam.io.ReadFromBigQuery(
                    query="SELECT user_id, segment, country, registration_date FROM users.profiles")
                | 'Create User Dict' >> beam.Map(
                    lambda row: (row['user_id'], row))
                | 'Convert to Dict' >> beam.combiners.ToDict())

    # Enrich stream with side input
    enriched_stream = (main_stream
                      | 'Parse Events' >> beam.Map(json.loads)
                      | 'Enrich with User Data' >> beam.ParDo(
                          EnrichWithUserData(),
                          user_data=beam.pvalue.AsDict(user_data)))

📊 Casos de Uso Batch Processing

1. ETL Masivo desde Cloud Storage

class TransformRecord(beam.DoFn):
    def process(self, element):
        # Parse CSV row
        fields = element.split(',')

        if len(fields) < 5:
            return  # Skip invalid records

        try:
            yield {
                'transaction_id': fields[0],
                'customer_id': int(fields[1]),
                'product_id': fields[2],
                'amount': float(fields[3]),
                'transaction_date': fields[4],
                'processed_timestamp': datetime.utcnow().isoformat()
            }
        except (ValueError, IndexError):
            # Log error and continue
            logging.error(f"Failed to parse record: {element}")

def run_batch_etl():
    options = PipelineOptions([
        '--project=my-gcp-project',
        '--runner=DataflowRunner',
        '--region=us-central1',
        '--temp_location=gs://my-temp-bucket/temp'
    ])

    with beam.Pipeline(options=options) as pipeline:
        (pipeline
         | 'Read CSV Files' >> beam.io.ReadFromText(
             'gs://my-data-bucket/transactions/*.csv',
             skip_header_lines=1)
         | 'Transform Records' >> beam.ParDo(TransformRecord())
         | 'Validate Data' >> beam.Filter(validate_record)
         | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             table='warehouse.transactions',
             schema=transaction_schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

2. Procesamiento con Ventanas Complejas

from apache_beam.transforms import window

def advanced_windowing_pipeline():
    with beam.Pipeline(options=pipeline_options) as pipeline:
        events = (pipeline
                 | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
                     topic='projects/my-project/topics/sensor-data')
                 | 'Parse JSON' >> beam.Map(json.loads)
                 | 'Add Event Time' >> beam.Map(
                     lambda x: beam.window.TimestampedValue(
                         x, x['event_timestamp'])))

        # Ventana fija de 5 minutos
        fixed_window_metrics = (events
                               | 'Fixed Window 5min' >> beam.WindowInto(
                                   window.FixedWindows(5 * 60))
                               | 'Group by Sensor' >> beam.GroupBy('sensor_id')
                               | 'Calculate Avg' >> beam.CombinePerKey(
                                   beam.combiners.MeanCombineFn()))

        # Ventana deslizante de 10 minutos cada 2 minutos
        sliding_window_metrics = (events
                                 | 'Sliding Window' >> beam.WindowInto(
                                     window.SlidingWindows(10 * 60, 2 * 60))
                                 | 'Count per Window' >> beam.combiners.Count.PerElement())

        # Ventana por sesión (gap de 30 segundos)
        session_metrics = (events
                          | 'Session Window' >> beam.WindowInto(
                              window.Sessions(30))
                          | 'Session Analysis' >> beam.ParDo(AnalyzeSession()))

⚙️ Optimización y Tuning

Worker Configuration

# Opciones optimizadas para diferentes workloads
def get_pipeline_options(workload_type):
    base_options = [
        '--project=my-gcp-project',
        '--runner=DataflowRunner',
        '--region=us-central1'
    ]

    if workload_type == 'memory_intensive':
        return PipelineOptions(base_options + [
            '--machine_type=n1-highmem-4',
            '--max_num_workers=20',
            '--disk_size_gb=100'
        ])

    elif workload_type == 'cpu_intensive':
        return PipelineOptions(base_options + [
            '--machine_type=n1-highcpu-16',
            '--max_num_workers=50',
            '--num_workers=10'
        ])

    elif workload_type == 'streaming':
        return PipelineOptions(base_options + [
            '--streaming=true',
            '--enable_streaming_engine=true',
            '--max_num_workers=10',
            '--autoscaling_algorithm=THROUGHPUT_BASED'
        ])

Custom Metrics y Monitoring

class CustomMetricsDoFn(beam.DoFn):
    def __init__(self):
        self.error_counter = Metrics.counter('pipeline', 'processing_errors')
        self.success_counter = Metrics.counter('pipeline', 'successful_records')
        self.processing_time = Metrics.distribution('pipeline', 'processing_time_ms')

    def process(self, element):
        start_time = time.time()

        try:
            # Process element
            result = self.transform_element(element)
            self.success_counter.inc()

            processing_time_ms = (time.time() - start_time) * 1000
            self.processing_time.update(processing_time_ms)

            yield result

        except Exception as e:
            self.error_counter.inc()
            logging.error(f"Processing error: {e}")

💰 Optimización de Costos

Estrategias de Ahorro

# Usar preemptible workers para workloads tolerantes a fallos
cost_optimized_options = PipelineOptions([
    '--use_preemptible_workers=true',
    '--preemptible_worker_percentage=80',
    '--enable_streaming_engine=true',  # Reduce worker overhead
    '--max_num_workers=5',  # Limitar scaling
    '--autoscaling_algorithm=BASIC'
])

# Configurar shutdown automático
def create_pipeline_with_shutdown():
    options = PipelineOptions([
        '--save_main_session=true',
        '--setup_file=./setup.py',
        '--experiments=use_beam_bq_sink'
    ])

Monitoring de Costos

from google.cloud import monitoring_v3

def monitor_dataflow_costs():
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{PROJECT_ID}"

    # Query para obtener métricas de costo
    interval = monitoring_v3.TimeInterval({
        "end_time": {"seconds": int(time.time())},
        "start_time": {"seconds": int(time.time() - 3600)}  # Last hour
    })

    results = client.list_time_series(
        request={
            "name": project_name,
            "filter": 'resource.type="dataflow_job"',
            "interval": interval,
            "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )

    for result in results:
        print(f"Job: {result.resource.labels['job_name']}")
        print(f"Cost: ${result.points[0].value.double_value}")

📚 Mejores Prácticas

Performance

  • Usa Streaming Engine para cargas streaming
  • Implementa side inputs eficientemente
  • Optimiza ventanas según el caso de uso
  • Monitorea hotkeys en operaciones GroupBy

Desarrollo

  • Prueba pipelines localmente primero
  • Usa templates para reutilización
  • Implementa manejo de errores robusto
  • Versiona pipelines para rollbacks

Operaciones

  • Configura alertas de fallo
  • Usa dead letter queues para errores
  • Implementa checkpoints para recovery
  • Monitorea métricas de lag en streaming

¡Dataflow te permite procesar datos a escala Google! 🚀