2025-05-17 11:36:26 -04:00

1441 lines
48 KiB
Go

// internal/kafka/consumer.go
package kafka
import (
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"consumer/internal/database"
"consumer/internal/logging"
"consumer/internal/models"
"consumer/internal/models/entidad_db"
"consumer/internal/notifier"
soapClient "consumer/internal/soap"
"consumer/internal/utils"
"consumer/internal/utils/algoritmo"
"consumer/internal/utils/generadorFirmaDigital"
//"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
)
// -----------------------------------------------------------------------------
// Constants
// -----------------------------------------------------------------------------
const (
defaultMaxRetries = 3 // 4 reintentos 0,1,2,3
// Retry delay
retryDelay = 5 * time.Second // Tiempo de espera entre reintentos
defaultWorkerCount = 10
defaultChannelBuffer = 1000
// Timeouts
dbQueryTimeout = 5 * time.Second // Reducir de 10s a 5s
soapCallTimeout = 20 * time.Second // Reducir de 30s a 20s
kafkaCommitTimeout = 3 * time.Second // Reducir de 5s a 3s
workerProcessTimeout = 20 * time.Second // Reducir de 30s a 20s
// Sleep durations
kafkaReconnectDelay = 1 * time.Second // Reducir de 2s a 1s
healthCheckInterval = 15 * time.Second // Reducir de 30s a 15s
)
// -----------------------------------------------------------------------------
// Message Types
// -----------------------------------------------------------------------------
type KafkaMessage struct {
TransaccionID string `json:"transaccion_id"`
Estado string `json:"estado"`
NumeroFactura string `json:"numero_factura"`
}
type ResultMessage struct {
TransaccionID string `json:"transaccion_id"`
Estado string `json:"estado"`
NumeroFactura string `json:"numero_factura"`
CodigoAutorizacion string `json:"codigo_autorizacion,omitempty"`
MensajeError string `json:"mensaje_error,omitempty"`
}
// -----------------------------------------------------------------------------
// Retry Timers and Processing Locks
// -----------------------------------------------------------------------------
var (
retryTimers = make(map[string]*time.Timer)
retryTimersMtx sync.Mutex
// Mecanismo de bloqueo para prevenir procesamiento concurrente
processingLock sync.Mutex
processingIDs = make(map[string]bool)
)
// -----------------------------------------------------------------------------
// Kafka Consumer
// -----------------------------------------------------------------------------
type KafkaConsumer struct {
factReader atomic.Value // *kafka.Reader
estadoReader atomic.Value // *kafka.Reader
anularReader atomic.Value // *kafka.Reader
resultWriter *kafka.Writer
dlqWriter *kafka.Writer
dbManager *database.DBManager
soapClient *soapClient.SOAPClient
notifier *notifier.Notifier
logger *logging.LoggerSystem
maxRetries int
// Para manejar shutdown
shutdownTimers sync.WaitGroup
}
// NewKafkaConsumer crea una nueva instancia del consumidor Kafka
func NewKafkaConsumer(
brokers []string,
groupID, topicFact, topicEst, topicAnular, resTopic, dlqTopic string,
dbManager *database.DBManager,
soapClient *soapClient.SOAPClient,
notifier *notifier.Notifier,
logger *logging.LoggerSystem,
) *KafkaConsumer {
kc := &KafkaConsumer{
resultWriter: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: resTopic,
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
Async: false,
}),
dlqWriter: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: dlqTopic,
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
Async: false,
}),
dbManager: dbManager,
soapClient: soapClient,
notifier: notifier,
logger: logger,
maxRetries: defaultMaxRetries,
}
// Initialize atomic readers
factReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID + "-fact",
Topic: topicFact,
MinBytes: 1,
MaxBytes: 10e6,
MaxWait: 100 * time.Millisecond,
ReadBackoffMin: 100 * time.Millisecond,
ReadBackoffMax: 5 * time.Second,
})
kc.factReader.Store(factReader)
estReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID + "-estado",
Topic: topicEst,
MinBytes: 1,
MaxBytes: 10e6,
MaxWait: 100 * time.Millisecond,
ReadBackoffMin: 100 * time.Millisecond,
ReadBackoffMax: 5 * time.Second,
})
kc.estadoReader.Store(estReader)
// Add reader for ANULAR topic
anularReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID + "-anular",
Topic: topicAnular,
MinBytes: 1,
MaxBytes: 10e6,
MaxWait: 100 * time.Millisecond,
ReadBackoffMin: 100 * time.Millisecond,
ReadBackoffMax: 5 * time.Second,
})
kc.anularReader.Store(anularReader)
return kc
}
// -----------------------------------------------------------------------------
// Control de acceso concurrente
// -----------------------------------------------------------------------------
// acquireProcessingLock intenta adquirir un bloqueo para procesar un ID específico
// Retorna true si el bloqueo fue adquirido, false si ya está siendo procesado
func (kc *KafkaConsumer) acquireProcessingLock(id string) bool {
processingLock.Lock()
defer processingLock.Unlock()
if processingIDs[id] {
return false
}
processingIDs[id] = true
return true
}
// releaseProcessingLock libera el bloqueo para un ID específico
func (kc *KafkaConsumer) releaseProcessingLock(id string) {
processingLock.Lock()
defer processingLock.Unlock()
delete(processingIDs, id)
}
// cancelarReintentosPendientes cancela cualquier reintento programado para una factura
func (kc *KafkaConsumer) cancelarReintentosPendientes(id string) {
retryTimersMtx.Lock()
defer retryTimersMtx.Unlock()
if timer, exists := retryTimers[id]; exists {
timer.Stop()
delete(retryTimers, id)
kc.logger.Info.Printf("[ID:%s] Cancelados reintentos pendientes", id)
}
}
// Método helper para registrar interacciones de forma consistente
func (kc *KafkaConsumer) registrarInteraccion(ctx context.Context, id, tipoServicio, reqBody, respBody string,
statusCode int, dur int64, exitoso bool, vMsgRespSoap string) error {
return kc.dbManager.RegistrarInteraccionServicio(
ctx, id, tipoServicio, kc.soapClient.SoapURL,
reqBody, respBody, statusCode, dur, exitoso, vMsgRespSoap)
}
func (kc *KafkaConsumer) Close() {
// Cancelar todos los timers pendientes
retryTimersMtx.Lock()
for id, timer := range retryTimers {
timer.Stop()
delete(retryTimers, id)
}
retryTimersMtx.Unlock()
// Esperar a que todas las goroutines de reintento terminen
kc.shutdownTimers.Wait()
if reader := kc.factReader.Load(); reader != nil {
reader.(*kafka.Reader).Close()
}
if reader := kc.estadoReader.Load(); reader != nil {
reader.(*kafka.Reader).Close()
}
if reader := kc.anularReader.Load(); reader != nil {
reader.(*kafka.Reader).Close()
}
kc.resultWriter.Close()
kc.dlqWriter.Close()
kc.logger.Info.Println("Cerrando KafkaConsumer")
}
// -----------------------------------------------------------------------------
// Message Processing
// -----------------------------------------------------------------------------
// procesarFactura procesa una factura, evitando el bucle infinito y el procesamiento duplicado
func (kc *KafkaConsumer) procesarFactura(ctx context.Context, evt models.FacturaEventModel) error {
id := evt.TransaccionID
// Aplicar bloqueo para evitar procesamiento concurrente del mismo ID
if !kc.acquireProcessingLock(id) {
kc.logger.Info.Printf("[ID:%s] Ya está siendo procesada por otro worker, ignorando", id)
return nil
}
defer kc.releaseProcessingLock(id)
kc.logger.Info.Printf("[ID:%s] Iniciando procesamiento de factura", id)
// Paso 1: Verificar si ya fue procesada y enviada a DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
return fmt.Errorf("error verificando DLQ: %w", err)
}
if inDLQ {
kc.logger.Info.Printf("[ID:%s] Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id)
kc.cancelarReintentosPendientes(id)
return database.ErrAlreadyProcessed
}
// NUEVO: Verificar si la factura ya está en estado FINALIZADA
exists, numeroFactura, estadoActual, fechaEmision, err := kc.dbManager.VerificarExistenciaFactura(ctx, id)
if err != nil {
return fmt.Errorf("error verificando factura: %w", err)
}
if exists && estadoActual == "FINALIZADA" && !strings.EqualFold(evt.Estado, "REPROCESAR") {
kc.logger.Info.Printf("[ID:%s] Ya está FINALIZADA, ignorando procesamiento duplicado", id)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Enviar mensaje de resultado confirmando estado
res := ResultMessage{
TransaccionID: id,
Estado: "FINALIZADA",
NumeroFactura: numeroFactura,
}
if err := kc.resultWriter.WriteMessages(ctx,
kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)},
); err != nil {
kc.logger.Error.Printf("[ID:%s] Error enviando resultado de confirmación: %v", id, err)
}
return nil
}
// Paso 2: Obtener datos de empresa
nit := evt.Payload.Cabecera.NitEmisor
codigoSucursal := evt.Payload.Cabecera.CodigoSucursal
// nit de tipo 2 ofuscar
if evt.Payload.Cabecera.CodigoMetodoPago == 2 {
// Aquí iría la lógica de ofuscación si es necesaria
}
if evt.Payload.Cabecera.CodigoPuntoVenta.Value == nil {
kc.logger.Error.Printf("[ID:%s] CodigoPuntoVenta es nulo: %w", id, database.ErrNilPointer)
return fmt.Errorf("CodigoPuntoVenta es nulo: %w", database.ErrNilPointer)
}
codigoPuntoVenta := *evt.Payload.Cabecera.CodigoPuntoVenta.Value
datosFactura, err := kc.dbManager.GetDatosFacturaEmpresa(ctx, nit, codigoSucursal, codigoPuntoVenta)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error obteniendo datos empresa: %w", id, err)
return fmt.Errorf("error obteniendo datos empresa: %w", err)
}
// Paso 3: Obtener CUIS y CUFD
_, cufd, codControl, err := kc.dbManager.GetCUISDatosFactura(ctx, datosFactura.RegistroEmpresaID)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error obteniendo CUIS/CUFD: %w", id, err)
return fmt.Errorf("error obteniendo CUIS/CUFD: %w", err)
}
// Paso 4: Generar CUF
dCuf := evt.Payload.Cabecera
// Formatear fecha para CUF
formatDate, err := utils.FormatFecha(dCuf.FechaEmision)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error formateando fecha: %w", id, err)
return fmt.Errorf("error formateando fecha: %w", err)
}
vCuf := algoritmo.GenerateCUF(
strconv.FormatInt(dCuf.NitEmisor, 10),
formatDate,
strconv.Itoa(dCuf.CodigoSucursal),
strconv.Itoa(datosFactura.CodigoModalidad),
strconv.Itoa(dCuf.TipoEmision),
strconv.Itoa(dCuf.TipoFactura),
strconv.Itoa(dCuf.CodigoDocumentoSector),
dCuf.NumeroFactura,
strconv.Itoa(*dCuf.CodigoPuntoVenta.Value),
codControl,
)
// Actualizar CUF y CUFD en la cabecera
evt.Payload.Cabecera.Cuf = vCuf
evt.Payload.Cabecera.Cufd = cufd
// Paso 5: Verificar existencia de la factura y obtener datos
// (Ya tenemos estos datos de la verificación anterior)
prevEstado := estadoActual
// Construir la factura electrónica
fact := models.FacturaElectronicaCompraVenta{
Xsi: "http://www.w3.org/2001/XMLSchema-instance",
NoNamespaceSchemaLocation: "facturaElectronicaCompraVenta.xsd",
Cabecera: evt.Payload.Cabecera,
Detalle: evt.Payload.Detalle,
}
// Serializar a XML con indentación
vFactura, err := xml.MarshalIndent(fact, "", " ")
if err != nil {
kc.logger.Error.Printf("[ID:%s] error al serializar XML: %v", id, err)
return fmt.Errorf("error al serializar XML: %v", err)
}
kc.logger.Info.Printf("[ID:%s] XML factura creado", id)
// Si la factura no existe, crearla
if !exists {
if strings.EqualFold(evt.Estado, "REPROCESAR") {
kc.logger.Info.Printf("[ID:%s] factura %s no existe para reprocesar", id)
return fmt.Errorf("factura %s no existe para reprocesar", id)
}
// Valores por defecto para nueva factura
numeroFactura = evt.Payload.Cabecera.NumeroFactura
fechaEmision = utils.NowInBolivia()
prevEstado = "PENDIENTE"
vUrlFact, err := utils.ConstruirURLSIAT(numeroFactura, vCuf)
if err != nil {
kc.logger.Error.Printf("[ID:%s] Error al concatenar la url del siat: %w", id, err)
return fmt.Errorf("Error al concatenar la url del siat: %w", err)
}
// Crear la factura
if err := kc.dbManager.CrearFacturaPendiente(ctx, id,
fechaEmision, prevEstado, vCuf, vUrlFact, datosFactura.RegistroEmpresaID, evt.Payload.Cabecera.CodigoDocumentoSector, string(vFactura)); err != nil {
return fmt.Errorf("error creando factura: %w", err)
}
}
// Paso 6: Procesar según el estado
if strings.EqualFold(evt.Estado, "REPROCESAR") {
// Revertir la factura
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "REVERTIDA", "", prevEstado,
"Factura revertida por solicitud", ""); err != nil {
kc.logger.Error.Printf("[ID:%s] error revirtiendo factura: %w", id, err)
return fmt.Errorf("error revirtiendo factura: %w", err)
}
// Enviar mensaje de resultado
res := ResultMessage{
TransaccionID: id,
Estado: "REVERTIDA",
NumeroFactura: numeroFactura,
}
if err := kc.resultWriter.WriteMessages(ctx,
kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)},
); err != nil {
return fmt.Errorf("error enviando resultado: %w", err)
}
kc.logger.Info.Printf("[ID:%s] Factura revertida exitosamente", id)
return nil
}
// Paso 7: Generar el XML, firmarlo y enviarlo a SIAT
// Serializar a XML
xmlBytes, err := xml.MarshalIndent(fact, "", " ")
if err != nil {
kc.logger.Error.Printf("[ID:%s] error serializando XML: %w", id, err)
return fmt.Errorf("error serializando XML: %w", err)
}
xmlBytes = append([]byte(xml.Header), xmlBytes...)
// Rutas de certificados
privKeyPath := filepath.Join("internal", "utils", "generadorFirmaDigital", "certificados", datosFactura.NombreArchivoClavePrivada)
certPath := filepath.Join("internal", "utils", "generadorFirmaDigital", "certificados", datosFactura.NombreArchivoCertificado)
priv, err := generadorFirmaDigital.CargarLlavePrivada(privKeyPath)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error cargando llave privada: %w", id, err)
return fmt.Errorf("error cargando llave privada: %w", err)
}
//certificado
cert, err := generadorFirmaDigital.CargarCertificado(certPath)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error cargando certificado: %w", id, err)
return fmt.Errorf("error cargando certificado: %w", err)
}
// Firmar XML
firmado, err := generadorFirmaDigital.FirmarXML(xmlBytes, priv, cert)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error firmando XML: %w", id, err)
return fmt.Errorf("error firmando XML: %w", err)
}
// Calcular hash
hashArchivo := generadorFirmaDigital.Sha256Hex(firmado)
// Comprimir y codificar
gz64, err := generadorFirmaDigital.GzipBase64(firmado)
if err != nil {
kc.logger.Error.Printf("[ID:%s] error comprimiendo XML: %w", id, err)
return fmt.Errorf("error comprimiendo XML: %w", err)
}
// Paso 8: Enviar a SIAT con reintentos - MEJORADO
return kc.procesarConReintentosInicial(ctx, evt, id, firmado, gz64, hashArchivo, datosFactura, numeroFactura, prevEstado)
}
// procesarConReintentosInicial es el punto de entrada para el procesamiento inicial y reintentos
// MEJORADO: Evita bucle infinito y solo hace el primer intento
func (kc *KafkaConsumer) procesarConReintentosInicial(ctx context.Context, evt models.FacturaEventModel,
id string, firmado []byte, gz64 string, hashArchivo string,
datosFactura entidad_db.DatosFactura, numeroFactura string, prevEstado string) error {
var lastErr error
kc.logger.Info.Printf("[ID:%s][Attempt:0] Iniciando intento inicial de envío a SIAT", id)
// Solo realizar el intento inicial (intento 0)
// Crear un contexto con timeout para la llamada SOAP
callCtx, cancel := context.WithTimeout(ctx, soapCallTimeout)
soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallService(
callCtx, datosFactura, gz64, hashArchivo)
cancel()
kc.logger.Info.Printf("[ID:%s][Attempt:0] Respuesta SOAP: %s", id, soapResp.CodigoDescripcion)
estadoSoap := soapResp.CodigoDescripcion
//MensajesList
mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList)
if jsonErr != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:0] Error al obtener el mensaje del servicio SOAP: %v", id, jsonErr)
mensajesJSON = []byte("{}")
}
// IMPORTANTE: Registrar la interacción independientemente del resultado
regErr := kc.registrarInteraccion(ctx, id, "RECEPCION_FACTURA",
reqBody, respBody, statusCode, dur, err == nil && soapResp.Transaccion, string(mensajesJSON))
if regErr != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:0] Error registrando interacción: %v", id, regErr)
} else {
kc.logger.Info.Printf("[ID:%s][Attempt:0] Interacción registrada", id)
}
// Manejar errores
if err != nil || !soapResp.Transaccion {
if err != nil {
lastErr = err
kc.logger.Error.Printf("[ID:%s][Attempt:0] Error llamando SIAT: %v", id, err)
} else {
lastErr = fmt.Errorf("SIAT error: %s", soapResp.CodigoDescripcion)
kc.logger.Error.Printf("[ID:%s][Attempt:0] SIAT rechazó la transacción: %s", id, soapResp.CodigoDescripcion)
}
// Programar el primer reintento asíncrono y terminar
kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, 1,
estadoSoap, numeroFactura, prevEstado)
return nil
}
// Transacción exitosa - actualizar la factura
kc.logger.Info.Printf("[ID:%s][Attempt:0] Transacción exitosa, actualizando factura", id)
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "FINALIZADA", "",
prevEstado, fmt.Sprintf("Factura finalizada: %v", soapResp.Transaccion), string(firmado)); err != nil {
lastErr = fmt.Errorf("error actualizando factura finalizada: %w", err)
kc.logger.Error.Printf("[ID:%s][Attempt:0] Error actualizando factura finalizada: %v", id, err)
// Programar reintento
kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, 1,
estadoSoap, numeroFactura, prevEstado)
return nil
}
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Enviar mensaje de resultado exitoso
res := ResultMessage{
TransaccionID: id,
Estado: "FINALIZADA",
NumeroFactura: numeroFactura,
CodigoAutorizacion: soapResp.CodigoEstado,
}
if err := kc.resultWriter.WriteMessages(ctx,
kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)},
); err != nil {
return fmt.Errorf("error enviando resultado: %w", err)
}
kc.logger.Info.Printf("[ID:%s][Attempt:0] Factura finalizada exitosamente", id)
return lastErr
}
// programa un reintento asíncrono evitando llamadas recursivas
func (kc *KafkaConsumer) programarReintento(evt models.FacturaEventModel, id string,
firmado []byte, gz64 string, hashArchivo string, datosFactura entidad_db.DatosFactura,
attempt int, estadoSoapAnterior string, numeroFactura string, prevEstado string) {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Programando reintento en %v", id, attempt, retryDelay)
retryTimersMtx.Lock()
// Cancelar timer anterior si existe
if oldTimer, exists := retryTimers[id]; exists {
oldTimer.Stop()
kc.logger.Info.Printf("[ID:%s] Timer anterior cancelado", id)
}
// Incrementar contador para esperar goroutines durante el cierre
kc.shutdownTimers.Add(1)
// Crear nuevo timer
retryTimers[id] = time.AfterFunc(retryDelay, func() {
defer kc.shutdownTimers.Done()
// Remover el timer del mapa cuando se ejecuta
retryTimersMtx.Lock()
delete(retryTimers, id)
retryTimersMtx.Unlock()
// Crear un nuevo contexto para el reintento
newCtx, cancel := context.WithTimeout(context.Background(), workerProcessTimeout)
defer cancel()
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ejecutando reintento programado", id, attempt)
// Ejecutar el reintento
kc.ejecutarReintento(newCtx, evt, id, firmado, gz64, hashArchivo, datosFactura, attempt,
estadoSoapAnterior, numeroFactura, prevEstado)
})
retryTimersMtx.Unlock()
}
// maneja un reintento específico con verificaciones mejoradas
func (kc *KafkaConsumer) ejecutarReintento(ctx context.Context, evt models.FacturaEventModel,
id string, firmado []byte, gz64 string, hashArchivo string, datosFactura entidad_db.DatosFactura,
attempt int, estadoSoapAnterior string, numeroFactura string, prevEstado string) {
// No necesitamos adquirir el bloqueo global porque cada reintento es independiente
// y ya verificamos el estado actual de la factura
// IMPORTANTE: Verificar si la factura ya está finalizada antes de ejecutar cualquier reintento
exists, _, currentEstado, _, err := kc.dbManager.VerificarExistenciaFactura(ctx, id)
if err != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando estado actual: %v", id, attempt, err)
} else if exists && currentEstado == "FINALIZADA" {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ignorando reintento porque ya está FINALIZADA", id, attempt)
// Eliminar cualquier timer pendiente
kc.cancelarReintentosPendientes(id)
return
}
// IMPORTANTE: Verificar también si la factura ya está en la DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando DLQ: %v", id, attempt, err)
} else if inDLQ {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ignorando reintento porque ya está en DLQ", id, attempt)
// Eliminar cualquier timer pendiente
kc.cancelarReintentosPendientes(id)
return
}
// IMPORTANTE: Verificar que no excedamos el número máximo de reintentos
if attempt > kc.maxRetries {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Se alcanzó el número máximo de reintentos", id, attempt)
// Manejar como error final
kc.manejarErrorFinal(ctx, evt, id, estadoSoapAnterior, numeroFactura, "Máximo de reintentos alcanzado")
return
}
var lastErr error
var estadoSoap string = estadoSoapAnterior
// Crear un contexto con timeout para la llamada SOAP
callCtx, cancel := context.WithTimeout(ctx, soapCallTimeout)
soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallService(
callCtx, datosFactura, gz64, hashArchivo)
cancel()
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Respuesta SOAP: %s", id, attempt, soapResp.CodigoDescripcion)
estadoSoap = soapResp.CodigoDescripcion
mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList)
if jsonErr != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error al obtener el mensaje del servicio SOAP: %v",
id, attempt, jsonErr)
mensajesJSON = []byte("{}")
}
// IMPORTANTE: Registrar la interacción en reintentos
regErr := kc.registrarInteraccion(ctx, id, "RECEPCION_FACTURA",
reqBody, respBody, statusCode, dur, err == nil && soapResp.Transaccion, string(mensajesJSON))
if regErr != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error registrando interacción: %v", id, attempt, regErr)
} else {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Interacción registrada", id, attempt)
}
// Manejar errores
if err != nil || !soapResp.Transaccion {
if err != nil {
lastErr = err
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error llamando SIAT: %v", id, attempt, err)
} else {
lastErr = fmt.Errorf("SIAT error: %s", soapResp.CodigoDescripcion)
kc.logger.Error.Printf("[ID:%s][Attempt:%d] SIAT rechazó la transacción: %s",
id, attempt, soapResp.CodigoDescripcion)
}
// Si quedan reintentos, programar el siguiente
if attempt < kc.maxRetries {
kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, attempt+1,
estadoSoap, numeroFactura, prevEstado)
} else {
// No quedan reintentos, manejar como error final
errMsg := "Máximo de reintentos alcanzado"
if lastErr != nil {
errMsg = lastErr.Error()
}
kc.manejarErrorFinal(ctx, evt, id, estadoSoap, numeroFactura, errMsg)
}
return
}
// Transacción exitosa
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Transacción exitosa, actualizando factura", id, attempt)
// Verificar nuevamente si la factura ya fue finalizada (puede haber ocurrido por otro proceso)
exists, _, currentEstado, _, err = kc.dbManager.VerificarExistenciaFactura(ctx, id)
if err != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando estado actual: %v", id, attempt, err)
} else if exists && currentEstado == "FINALIZADA" {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] La factura ya fue FINALIZADA por otro proceso", id, attempt)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
return
}
// Actualizar la factura
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "FINALIZADA", "",
prevEstado, fmt.Sprintf("Factura finalizada: %v", soapResp.Transaccion), string(firmado)); err != nil {
lastErr = fmt.Errorf("error actualizando factura finalizada: %w", err)
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error actualizando factura finalizada: %v", id, attempt, err)
// Si quedan reintentos, programar el siguiente
if attempt < kc.maxRetries {
kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, attempt+1,
estadoSoap, numeroFactura, prevEstado)
} else {
// No quedan reintentos, manejar como error final
kc.manejarErrorFinal(ctx, evt, id, estadoSoap, numeroFactura, lastErr.Error())
}
return
}
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Enviar mensaje de resultado exitoso
res := ResultMessage{
TransaccionID: id,
Estado: "FINALIZADA",
NumeroFactura: numeroFactura,
CodigoAutorizacion: soapResp.CodigoEstado,
}
if err := kc.resultWriter.WriteMessages(ctx,
kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)},
); err != nil {
kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error enviando resultado: %v", id, attempt, err)
} else {
kc.logger.Info.Printf("[ID:%s][Attempt:%d] Factura finalizada exitosamente", id, attempt)
}
}
// manejarErrorFinal gestiona el caso cuando se agotan todos los reintentos
func (kc *KafkaConsumer) manejarErrorFinal(ctx context.Context, evt models.FacturaEventModel,
id string, estadoSoap string, numeroFactura string, errMsg string) {
kc.logger.Error.Printf("[ID:%s] Todos los reintentos agotados: %s", id, errMsg)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Crear el mensaje JSON para DLQ
msgJson, _ := json.Marshal(evt)
// Enviar mensaje de resultado de error
if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{
Key: []byte(id),
Value: utils.MustMarshal(ResultMessage{
TransaccionID: id,
Estado: "ERROR",
NumeroFactura: numeroFactura,
MensajeError: errMsg,
}),
}); err != nil {
kc.logger.Error.Printf("[ID:%s] Error enviando resultado de error: %v", id, err)
}
// Registrar en tabla facturacion_dlq
if err := kc.dbManager.InsertarEnDLQ(ctx, id, string(msgJson), estadoSoap); err != nil {
kc.logger.Error.Printf("[ID:%s] Error insertando en DLQ BD: %v", id, err)
} else {
kc.logger.Info.Printf("[ID:%s] Registrada en DLQ de base de datos", id)
}
// Verificar si ya existe en DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
kc.logger.Error.Printf("[ID:%s] Error verificando DLQ: %v", id, err)
} else if !inDLQ {
if err := kc.dlqWriter.WriteMessages(ctx, kafka.Message{
Key: []byte(id),
Value: msgJson,
}); err != nil {
kc.logger.Error.Printf("[ID:%s] Error enviando a DLQ Kafka: %v", id, err)
} else {
kc.logger.Info.Printf("[ID:%s] Enviada a DLQ Kafka", id)
}
} else {
kc.logger.Info.Printf("[ID:%s] Ya existente en DLQ, no se enviará nuevamente a Kafka DLQ", id)
}
// Enviar notificación por correo
notifyEmail := os.Getenv("NOTIFY_EMAIL")
if notifyEmail != "" {
subject := fmt.Sprintf("DLQ: %s", id)
body := fmt.Sprintf("ID: %s\nError: %s\nPayload: %s", id, errMsg, string(msgJson))
if err := kc.notifier.SendEmail(notifyEmail, subject, body); err != nil {
kc.logger.Error.Printf("[ID:%s] Error enviando notificación por correo: %v", id, err)
}
}
}
// procesarEstado procesa un mensaje de estado
func (kc *KafkaConsumer) procesarEstado(ctx context.Context, msg KafkaMessage) error {
// Validar ID
if msg.TransaccionID == "" || !utils.IsValidUUID(msg.TransaccionID) {
return database.ErrInvalidUUID
}
id := msg.TransaccionID
// Aplicar bloqueo para evitar procesamiento concurrente del mismo ID
if !kc.acquireProcessingLock(id) {
kc.logger.Info.Printf("[ID:%s] Estado: Ya está siendo procesada por otro worker, ignorando", id)
return nil
}
defer kc.releaseProcessingLock(id)
// Verificar si ya fue procesada y enviada a DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
return fmt.Errorf("error verificando DLQ: %w", err)
}
if inDLQ {
kc.logger.Info.Printf("[ID:%s] Estado: Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
return database.ErrAlreadyProcessed
}
// Obtener estado anterior
_, _, prevEstado, _, err := kc.dbManager.VerificarExistenciaFactura(ctx, id)
if err != nil {
return fmt.Errorf("error obteniendo factura: %w", err)
}
// Actualizar estado
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, msg.Estado, "",
prevEstado, "Estado recibido de FACTURACION_ESTADO", ""); err != nil {
return fmt.Errorf("error actualizando estado: %w", err)
}
kc.logger.Info.Printf("[ID:%s] Estado: %s → %s", id, prevEstado, msg.Estado)
return nil
}
// procesa un mensaje del topic de facturación
func (kc *KafkaConsumer) processFacturaMessage(ctx context.Context, m kafka.Message) error {
// Verificar ID en el mensaje
id := string(m.Key)
if id == "" {
id = utils.GenerateUUID()
kc.logger.Info.Printf("Generado nuevo ID para mensaje sin key: %s", id)
}
kc.logger.Info.Printf("[ID:%s] Procesando mensaje de facturación", id)
// Deserializar mensaje
var evt models.FacturaEventModel
if err := json.Unmarshal(m.Value, &evt); err != nil {
kc.logger.Error.Printf("[ID:%s] Error deserializando mensaje: %v", id, err)
return fmt.Errorf("%w: %v", database.ErrInvalidMessage, err)
}
// Validar ID
if evt.TransaccionID == "" || !utils.IsValidUUID(evt.TransaccionID) {
kc.logger.Error.Printf("[ID:%s] ID de transacción inválido: %s", id, evt.TransaccionID)
return database.ErrInvalidUUID
}
// Procesar la factura
return kc.procesarFactura(ctx, evt)
}
// Modificar el método para integrar verificación del estado en SIAT
func (kc *KafkaConsumer) handleEstadoMessage(ctx context.Context, m kafka.Message) error {
// Verificar ID en el mensaje
id := string(m.Key)
if id == "" {
kc.logger.Error.Printf("Mensaje de estado sin ID, ignorando")
return errors.New("mensaje de estado sin ID")
}
kc.logger.Info.Printf("[ID:%s] Procesando mensaje de estado", id)
// Aplicar bloqueo para evitar procesamiento concurrente del mismo ID
if !kc.acquireProcessingLock(id) {
kc.logger.Info.Printf("[ID:%s] Estado: Ya está siendo procesada por otro worker, ignorando", id)
return nil
}
defer kc.releaseProcessingLock(id)
vRegCuf, idRegCompras, prevEstado, err := kc.dbManager.GetFacturacionFacturas(ctx, id)
if err != nil {
return fmt.Errorf("Error obteniendo datos de la tabla RegistroEmpresa: %w", err)
}
// Paso 2: Obtener datos de las tablas RegistroEmpresa - cuf - cufd
datosFactura, err := kc.dbManager.GetRegEmpresaCuisCuf(ctx, idRegCompras)
if err != nil {
return fmt.Errorf("error obteniendo datos empresa: %w", err)
}
// Verificar si ya fue procesada y enviada a DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
return fmt.Errorf("error verificando DLQ: %w", err)
}
if inDLQ {
kc.logger.Info.Printf("[ID:%s] Estado: Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
return database.ErrAlreadyProcessed
}
// Llamar al servicio SOAP de verificación de estado
callCtx, callCancel := context.WithTimeout(ctx, soapCallTimeout)
defer callCancel()
soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallVerificacionEstadoFactura(
callCtx, datosFactura, vRegCuf)
kc.logger.Info.Printf("[ID:%s] Estado: Respuesta verificación SIAT: %+v", id, soapResp)
//MensajesList verificarEstadoSoap
mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList)
if jsonErr != nil {
kc.logger.Error.Printf("[ID:%s] Estado: Error al obtener el mensaje del servicio SOAP: %v", id, jsonErr)
return fmt.Errorf("Error al obtener el mensaje del servicio SOAP: %w", jsonErr)
}
// Registrar la interacción con el servicio usando el método helper
regErr := kc.registrarInteraccion(ctx, id, "VERIFICACION_ESTADO_FACTURA",
reqBody, respBody, statusCode, dur, err == nil, string(mensajesJSON))
if regErr != nil {
kc.logger.Error.Printf("[ID:%s] Estado: Error registrando interacción: %v", id, regErr)
}
if err != nil {
kc.logger.Error.Printf("[ID:%s] Estado: Error llamando al servicio de verificación: %v", id, err)
// Si falla la verificación, continuamos con la actualización de estado solicitada
kc.logger.Info.Printf("[ID:%s] Estado: Continuando con actualización de estado solicitada a pesar del error en SIAT", id)
// Actualizar estado con el mensaje recibido
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "PENDIENTE", "",
prevEstado, "Estado recibido de FACTURACION_ESTADO - Verificación SIAT falló", ""); err != nil {
return fmt.Errorf("error actualizando estado: %w", err)
}
kc.logger.Info.Printf("[ID:%s] Estado: %s → %s (Verificación SIAT falló)",
id, prevEstado, "PENDIENTE")
return nil
}
return nil
}
// handleAnularMessage processes a message from the FACTURACION_ANULAR topic
func (kc *KafkaConsumer) handleAnularMessage(ctx context.Context, m kafka.Message) error {
// Check ID in the message
id := string(m.Key)
if id == "" {
kc.logger.Error.Printf("Anulación message without ID, ignoring")
return errors.New("anulación message without ID")
}
kc.logger.Info.Printf("[ID:%s] Processing invoice cancellation message", id)
// Aplicar bloqueo para evitar procesamiento concurrente del mismo ID
if !kc.acquireProcessingLock(id) {
kc.logger.Info.Printf("[ID:%s] Anulación: Ya está siendo procesada por otro worker, ignorando", id)
return nil
}
defer kc.releaseProcessingLock(id)
// Check if already processed and sent to DLQ
inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
return fmt.Errorf("error checking DLQ: %w", err)
}
if inDLQ {
kc.logger.Info.Printf("[ID:%s] Anulación: Invoice already processed and sent to DLQ, skipping", id)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
return database.ErrAlreadyProcessed
}
// Get the CUF, company registration ID and previous state
vRegCuf, idRegCompras, prevEstado, err := kc.dbManager.GetFacturacionFacturas(ctx, id)
if err != nil {
return fmt.Errorf("error getting data from RegistroEmpresa table: %w", err)
}
// Verify invoice exists and is in FINALIZADA state
if prevEstado != "FINALIZADA" {
kc.logger.Error.Printf("[ID:%s] Anulación: Invoice cannot be cancelled - current state: %s", id, prevEstado)
return fmt.Errorf("invoice must be in FINALIZADA state to be cancelled, current state: %s", prevEstado)
}
// Get company data, CUIS and CUFD
datosFactura, err := kc.dbManager.GetRegEmpresaCuisCuf(ctx, idRegCompras)
if err != nil {
return fmt.Errorf("error getting company data: %w", err)
}
// Parse the cancellation reason code from the message (default to 1 if not provided)
var codigoMotivo int = 1
var anulacionData struct {
CodigoMotivo int `json:"codigo_motivo"`
}
if err := json.Unmarshal(m.Value, &anulacionData); err == nil && anulacionData.CodigoMotivo > 0 {
codigoMotivo = anulacionData.CodigoMotivo
}
// Call the SOAP service with retries
var lastErr error
for attempt := 0; attempt <= kc.maxRetries; attempt++ {
if attempt > 0 {
kc.logger.Info.Printf("[ID:%s] Anulación: Retry %d for cancellation", id, attempt)
// Usar select con context para asegurar que no se bloquee durante el shutdown
select {
case <-time.After(time.Duration(2<<attempt) * time.Second): // Exponential backoff
case <-ctx.Done():
return ctx.Err()
}
}
// Create a context with timeout for the SOAP call
callCtx, cancel := context.WithTimeout(ctx, soapCallTimeout)
soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallAnulacionFactura(
callCtx, datosFactura, vRegCuf, codigoMotivo)
cancel()
// Log response
kc.logger.Info.Printf("[ID:%s] Anulación: SOAP response code description: %s", id, soapResp.CodigoDescripcion)
//MensajesList
mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList)
if jsonErr != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error al obtener el mensaje del servicio SOAP: %v", id, jsonErr)
mensajesJSON = []byte("{}")
}
// Register interaction regardless of result using helper method
regErr := kc.registrarInteraccion(ctx, id, "ANULACION_FACTURA",
reqBody, respBody, statusCode, dur, err == nil && soapResp.Transaccion, string(mensajesJSON))
if regErr != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error registering interaction: %v", id, regErr)
}
// Handle errors
if err != nil {
lastErr = err
kc.logger.Error.Printf("[ID:%s] Anulación: Attempt %d: Error calling SIAT: %v", id, attempt, err)
continue
}
// Successful transaction - update invoice
if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "ANULADA", "",
prevEstado, fmt.Sprintf("Invoice cancelled: %v", soapResp.Transaccion), ""); err != nil {
lastErr = fmt.Errorf("error updating cancelled invoice: %w", err)
kc.logger.Error.Printf("[ID:%s] Anulación: Error updating cancelled invoice: %v", id, err)
continue
}
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Send successful result message
res := ResultMessage{
TransaccionID: id,
Estado: "ANULADA",
NumeroFactura: "", // We could get this from the database if needed
CodigoAutorizacion: soapResp.CodigoEstado,
}
if err := kc.resultWriter.WriteMessages(ctx,
kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)},
); err != nil {
return fmt.Errorf("error sending result: %w", err)
}
kc.logger.Info.Printf("[ID:%s] Anulación: Invoice cancelled successfully", id)
return nil
}
// If we get here, all retries failed
errMsg := "Error desconocido en anulación"
if lastErr != nil {
errMsg = lastErr.Error()
}
kc.logger.Error.Printf("[ID:%s] Anulación: All retries exhausted: %v", id, lastErr)
// Cancelar cualquier reintento pendiente
kc.cancelarReintentosPendientes(id)
// Create JSON message for DLQ
msgJson, _ := json.Marshal(map[string]interface{}{
"transaccion_id": id,
"estado": "ERROR_ANULACION",
"mensaje_error": errMsg,
})
// Send error result message
if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{
Key: []byte(id),
Value: utils.MustMarshal(ResultMessage{
TransaccionID: id,
Estado: "ERROR_ANULACION",
NumeroFactura: "",
MensajeError: errMsg,
}),
}); err != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error sending error result: %v", id, err)
}
// Register in facturacion_dlq table in database
if err := kc.dbManager.InsertarEnDLQ(ctx, id, string(msgJson), "ERROR_ANULACION"); err != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error inserting into DLQ DB: %v", id, err)
} else {
kc.logger.Info.Printf("[ID:%s] Anulación: Invoice registered in database DLQ", id)
}
// Send to Kafka DLQ ONLY IF not already in DLQ
inDLQ, err = kc.dbManager.IsFacturaInDLQ(ctx, id)
if err != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error checking DLQ: %v", id, err)
} else if !inDLQ {
if err := kc.dlqWriter.WriteMessages(ctx, kafka.Message{
Key: []byte(id),
Value: msgJson,
}); err != nil {
kc.logger.Error.Printf("[ID:%s] Anulación: Error sending to Kafka DLQ: %v", id, err)
} else {
kc.logger.Info.Printf("[ID:%s] Anulación: Invoice sent to Kafka DLQ", id)
}
} else {
kc.logger.Info.Printf("[ID:%s] Anulación: Invoice already exists in DLQ, won't send to Kafka DLQ again", id)
}
if lastErr != nil {
return lastErr
}
return errors.New(errMsg)
}
// -----------------------------------------------------------------------------
// Kafka Worker Pool
// -----------------------------------------------------------------------------
// runAsyncGroup implementa un pool de processors para procesar mensajes Kafka
func (kc *KafkaConsumer) runAsyncGroup(
ctx context.Context,
cfg kafka.ReaderConfig,
handler func(context.Context, kafka.Message) error,
name string,
workerCount, chanBuffer int,
) {
kc.logger.Info.Printf("[%s] Iniciando grupo asíncrono con %d processors", name, workerCount)
// 1) Reader dinámico usando atomic.Value
var readerPtr atomic.Value
readerPtr.Store(kafka.NewReader(cfg))
// 2) Canales de comunicación
msgCh := make(chan kafka.Message, chanBuffer)
resetCh := make(chan struct{}, 1)
var wg sync.WaitGroup
// --- a) Fetcher: obtiene mensajes y los envía al canal
wg.Add(1)
go func() {
defer wg.Done()
defer close(msgCh) // cierra msgCh al salir para que processors terminen
kc.logger.Info.Printf("[%s] Fetcher iniciado", name)
for {
select {
case <-ctx.Done():
kc.logger.Info.Printf("[%s] Fetcher: contexto cancelado", name)
return
case <-resetCh:
kc.logger.Info.Printf("[%s] Reiniciando reader", name)
old := readerPtr.Load().(*kafka.Reader)
old.Close()
readerPtr.Store(kafka.NewReader(cfg))
// Usar select con context para la espera
select {
case <-time.After(kafkaReconnectDelay):
// Continuar
case <-ctx.Done():
return
}
default:
// Obtener reader actual
r := readerPtr.Load().(*kafka.Reader)
// Fetch con timeout
fc, cancel := context.WithTimeout(ctx, 10*time.Second)
m, err := r.FetchMessage(fc)
cancel()
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
continue
}
kc.logger.Error.Printf("[%s] Error en FetchMessage: %v", name, err)
// Verificar si es error de conexión
if strings.Contains(err.Error(), "connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "EOF") {
select {
case resetCh <- struct{}{}:
default:
// Reset ya en progreso
}
}
select {
case <-time.After(kafkaReconnectDelay):
continue
case <-ctx.Done():
return
}
}
// Enviar mensaje al canal
select {
case msgCh <- m:
kc.logger.Debug.Printf("[%s] Mensaje enviado a processors: key=%s", name, string(m.Key))
case <-ctx.Done():
kc.logger.Info.Printf("[%s] Fetcher: contexto cancelado durante envío", name)
return
}
}
}
}()
// --- b) Health-checker: verifica la salud de la conexión Kafka
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop()
kc.logger.Info.Printf("[%s] Health-checker iniciado", name)
for {
select {
case <-ctx.Done():
kc.logger.Info.Printf("[%s] Health-checker: contexto cancelado", name)
return
case <-ticker.C:
r := readerPtr.Load().(*kafka.Reader)
// Intenta obtener el offset como verificación de salud
if err := r.SetOffset(r.Offset()); err != nil {
kc.logger.Error.Printf("[%s] Conexión Kafka no saludable: %v", name, err)
select {
case resetCh <- struct{}{}:
default:
// Reset ya en progreso
}
}
}
}
}()
// --- c) Pool de processors: procesan los mensajes
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
kc.logger.Info.Printf("[%s] Worker-%d iniciado", name, id)
for m := range msgCh {
key := string(m.Key)
kc.logger.Info.Printf("[%s] Worker-%d: procesando mensaje key=%s", name, id, key)
// Contexto con timeout para procesamiento
processCtx, cancelProcess := context.WithTimeout(ctx, workerProcessTimeout)
// Procesar mensaje con recovery de panic
func() {
defer func() {
if r := recover(); r != nil {
kc.logger.Error.Printf("[%s] Worker-%d: PANIC procesando key=%s: %v", name, id, key, r)
// Registrar el error en DLQ
dlqCtx, cancelDLQ := context.WithTimeout(context.Background(), 5*time.Second)
if err := kc.dlqWriter.WriteMessages(dlqCtx,
kafka.Message{
Key: m.Key,
Value: utils.MustMarshal(map[string]interface{}{
"transaccion_id": key,
"estado": "ERROR_PANIC",
"mensaje_error": fmt.Sprintf("Panic: %v", r),
}),
},
); err != nil {
kc.logger.Error.Printf("[%s] Worker-%d: error enviando panic a DLQ: %v", name, id, err)
}
cancelDLQ()
}
}()
// Procesar mensaje
procErr := handler(processCtx, m)
if procErr != nil {
if errors.Is(procErr, database.ErrAlreadyProcessed) {
kc.logger.Info.Printf("[%s] Worker-%d: mensaje key=%s ya procesado", name, id, key)
} else {
kc.logger.Error.Printf("[%s] Worker-%d: error procesando key=%s: %v", name, id, key, procErr)
// Solo enviar a DLQ si no es un error de datos ya procesados
if !errors.Is(procErr, database.ErrAlreadyProcessed) {
dlqCtx, cancelDLQ := context.WithTimeout(ctx, 5*time.Second)
if err := kc.dlqWriter.WriteMessages(dlqCtx,
kafka.Message{Key: m.Key, Value: m.Value},
); err != nil {
kc.logger.Error.Printf("[%s] Worker-%d: error enviando a DLQ: %v", name, id, err)
}
cancelDLQ()
}
}
}
}()
cancelProcess()
// Commit del mensaje (incluso si hubo error, para no reprocesar)
r := readerPtr.Load().(*kafka.Reader)
commitCtx, cancelCommit := context.WithTimeout(ctx, kafkaCommitTimeout)
if err := r.CommitMessages(commitCtx, m); err != nil {
kc.logger.Error.Printf("[%s] Worker-%d: error en commit key=%s: %v", name, id, key, err)
// Verificar si es error de conexión
if strings.Contains(err.Error(), "connection") {
select {
case resetCh <- struct{}{}:
default:
// Reset ya en progreso
}
}
} else {
kc.logger.Info.Printf("[%s] Worker-%d: commit OK para key=%s", name, id, key)
}
cancelCommit()
}
kc.logger.Info.Printf("[%s] Worker-%d finalizado", name, id)
}(i)
}
// Esperar a que todos los goroutines terminen
wg.Wait()
kc.logger.Info.Printf("[%s] Grupo asíncrono finalizado", name)
}
// Run inicia los consumidores Kafka
func (kc *KafkaConsumer) Run(ctx context.Context) {
kc.logger.Info.Println("Iniciando KafkaConsumer...")
// Configuraciones para los readers
factCfg := kc.factReader.Load().(*kafka.Reader).Config()
estCfg := kc.estadoReader.Load().(*kafka.Reader).Config()
anularCfg := kc.anularReader.Load().(*kafka.Reader).Config() // New configuration
// Cerrar los readers actuales
kc.factReader.Load().(*kafka.Reader).Close()
kc.estadoReader.Load().(*kafka.Reader).Close()
kc.anularReader.Load().(*kafka.Reader).Close() // Close anular reader
// Performance parameters
const (
factWorkers = defaultWorkerCount
estWorkers = 5 // fewer processors for status topic
anularWorkers = 3 // processors for cancellation topic
chanBufSize = defaultChannelBuffer
)
var topWg sync.WaitGroup
topWg.Add(3)
// Iniciar grupo para procesamiento de facturas
go func() {
defer topWg.Done()
kc.runAsyncGroup(ctx, factCfg, kc.processFacturaMessage, "FACTURACION", factWorkers, chanBufSize)
}()
// Iniciar grupo para procesamiento de estados
go func() {
defer topWg.Done()
kc.runAsyncGroup(ctx, estCfg, kc.handleEstadoMessage, "FACTURACION_ESTADO", estWorkers, chanBufSize)
}()
// Start group for cancellation processing
go func() {
defer topWg.Done()
kc.runAsyncGroup(ctx, anularCfg, kc.handleAnularMessage, "FACTURACION_ANULAR", anularWorkers, chanBufSize)
}()
// Esperar a que se cancele el contexto
<-ctx.Done()
kc.logger.Info.Println("Contexto cancelado, iniciando shutdown...")
// Esperar a que terminen los grupos
topWg.Wait()
kc.logger.Info.Println("KafkaConsumer finalizado correctamente")
}