// internal/kafka/consumer.go package kafka import ( "context" "encoding/json" "encoding/xml" "errors" "fmt" "os" "path/filepath" "strconv" "strings" "sync" "sync/atomic" "time" "consumer/internal/database" "consumer/internal/logging" "consumer/internal/models" "consumer/internal/models/entidad_db" "consumer/internal/notifier" soapClient "consumer/internal/soap" "consumer/internal/utils" "consumer/internal/utils/algoritmo" "consumer/internal/utils/generadorFirmaDigital" //"github.com/google/uuid" kafka "github.com/segmentio/kafka-go" ) // ----------------------------------------------------------------------------- // Constants // ----------------------------------------------------------------------------- const ( defaultMaxRetries = 3 // 4 reintentos 0,1,2,3 // Retry delay retryDelay = 5 * time.Second // Tiempo de espera entre reintentos defaultWorkerCount = 10 defaultChannelBuffer = 1000 // Timeouts dbQueryTimeout = 5 * time.Second // Reducir de 10s a 5s soapCallTimeout = 20 * time.Second // Reducir de 30s a 20s kafkaCommitTimeout = 3 * time.Second // Reducir de 5s a 3s workerProcessTimeout = 20 * time.Second // Reducir de 30s a 20s // Sleep durations kafkaReconnectDelay = 1 * time.Second // Reducir de 2s a 1s healthCheckInterval = 15 * time.Second // Reducir de 30s a 15s ) // ----------------------------------------------------------------------------- // Message Types // ----------------------------------------------------------------------------- type KafkaMessage struct { TransaccionID string `json:"transaccion_id"` Estado string `json:"estado"` NumeroFactura string `json:"numero_factura"` } type ResultMessage struct { TransaccionID string `json:"transaccion_id"` Estado string `json:"estado"` NumeroFactura string `json:"numero_factura"` CodigoAutorizacion string `json:"codigo_autorizacion,omitempty"` MensajeError string `json:"mensaje_error,omitempty"` } // ----------------------------------------------------------------------------- // Retry Timers and Processing Locks // ----------------------------------------------------------------------------- var ( retryTimers = make(map[string]*time.Timer) retryTimersMtx sync.Mutex // Mecanismo de bloqueo para prevenir procesamiento concurrente processingLock sync.Mutex processingIDs = make(map[string]bool) ) // ----------------------------------------------------------------------------- // Kafka Consumer // ----------------------------------------------------------------------------- type KafkaConsumer struct { factReader atomic.Value // *kafka.Reader estadoReader atomic.Value // *kafka.Reader anularReader atomic.Value // *kafka.Reader resultWriter *kafka.Writer dlqWriter *kafka.Writer dbManager *database.DBManager soapClient *soapClient.SOAPClient notifier *notifier.Notifier logger *logging.LoggerSystem maxRetries int // Para manejar shutdown shutdownTimers sync.WaitGroup } // NewKafkaConsumer crea una nueva instancia del consumidor Kafka func NewKafkaConsumer( brokers []string, groupID, topicFact, topicEst, topicAnular, resTopic, dlqTopic string, dbManager *database.DBManager, soapClient *soapClient.SOAPClient, notifier *notifier.Notifier, logger *logging.LoggerSystem, ) *KafkaConsumer { kc := &KafkaConsumer{ resultWriter: kafka.NewWriter(kafka.WriterConfig{ Brokers: brokers, Topic: resTopic, Balancer: &kafka.LeastBytes{}, BatchSize: 1, Async: false, }), dlqWriter: kafka.NewWriter(kafka.WriterConfig{ Brokers: brokers, Topic: dlqTopic, Balancer: &kafka.LeastBytes{}, BatchSize: 1, Async: false, }), dbManager: dbManager, soapClient: soapClient, notifier: notifier, logger: logger, maxRetries: defaultMaxRetries, } // Initialize atomic readers factReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID + "-fact", Topic: topicFact, MinBytes: 1, MaxBytes: 10e6, MaxWait: 100 * time.Millisecond, ReadBackoffMin: 100 * time.Millisecond, ReadBackoffMax: 5 * time.Second, }) kc.factReader.Store(factReader) estReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID + "-estado", Topic: topicEst, MinBytes: 1, MaxBytes: 10e6, MaxWait: 100 * time.Millisecond, ReadBackoffMin: 100 * time.Millisecond, ReadBackoffMax: 5 * time.Second, }) kc.estadoReader.Store(estReader) // Add reader for ANULAR topic anularReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID + "-anular", Topic: topicAnular, MinBytes: 1, MaxBytes: 10e6, MaxWait: 100 * time.Millisecond, ReadBackoffMin: 100 * time.Millisecond, ReadBackoffMax: 5 * time.Second, }) kc.anularReader.Store(anularReader) return kc } // ----------------------------------------------------------------------------- // Control de acceso concurrente // ----------------------------------------------------------------------------- // acquireProcessingLock intenta adquirir un bloqueo para procesar un ID específico // Retorna true si el bloqueo fue adquirido, false si ya está siendo procesado func (kc *KafkaConsumer) acquireProcessingLock(id string) bool { processingLock.Lock() defer processingLock.Unlock() if processingIDs[id] { return false } processingIDs[id] = true return true } // releaseProcessingLock libera el bloqueo para un ID específico func (kc *KafkaConsumer) releaseProcessingLock(id string) { processingLock.Lock() defer processingLock.Unlock() delete(processingIDs, id) } // cancelarReintentosPendientes cancela cualquier reintento programado para una factura func (kc *KafkaConsumer) cancelarReintentosPendientes(id string) { retryTimersMtx.Lock() defer retryTimersMtx.Unlock() if timer, exists := retryTimers[id]; exists { timer.Stop() delete(retryTimers, id) kc.logger.Info.Printf("[ID:%s] Cancelados reintentos pendientes", id) } } // Método helper para registrar interacciones de forma consistente func (kc *KafkaConsumer) registrarInteraccion(ctx context.Context, id, tipoServicio, reqBody, respBody string, statusCode int, dur int64, exitoso bool, vMsgRespSoap string) error { return kc.dbManager.RegistrarInteraccionServicio( ctx, id, tipoServicio, kc.soapClient.SoapURL, reqBody, respBody, statusCode, dur, exitoso, vMsgRespSoap) } func (kc *KafkaConsumer) Close() { // Cancelar todos los timers pendientes retryTimersMtx.Lock() for id, timer := range retryTimers { timer.Stop() delete(retryTimers, id) } retryTimersMtx.Unlock() // Esperar a que todas las goroutines de reintento terminen kc.shutdownTimers.Wait() if reader := kc.factReader.Load(); reader != nil { reader.(*kafka.Reader).Close() } if reader := kc.estadoReader.Load(); reader != nil { reader.(*kafka.Reader).Close() } if reader := kc.anularReader.Load(); reader != nil { reader.(*kafka.Reader).Close() } kc.resultWriter.Close() kc.dlqWriter.Close() kc.logger.Info.Println("Cerrando KafkaConsumer") } // ----------------------------------------------------------------------------- // Message Processing // ----------------------------------------------------------------------------- // procesarFactura procesa una factura, evitando el bucle infinito y el procesamiento duplicado func (kc *KafkaConsumer) procesarFactura(ctx context.Context, evt models.FacturaEventModel) error { id := evt.TransaccionID // Aplicar bloqueo para evitar procesamiento concurrente del mismo ID if !kc.acquireProcessingLock(id) { kc.logger.Info.Printf("[ID:%s] Ya está siendo procesada por otro worker, ignorando", id) return nil } defer kc.releaseProcessingLock(id) kc.logger.Info.Printf("[ID:%s] Iniciando procesamiento de factura", id) // Paso 1: Verificar si ya fue procesada y enviada a DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { return fmt.Errorf("error verificando DLQ: %w", err) } if inDLQ { kc.logger.Info.Printf("[ID:%s] Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id) kc.cancelarReintentosPendientes(id) return database.ErrAlreadyProcessed } // NUEVO: Verificar si la factura ya está en estado FINALIZADA exists, numeroFactura, estadoActual, fechaEmision, err := kc.dbManager.VerificarExistenciaFactura(ctx, id) if err != nil { return fmt.Errorf("error verificando factura: %w", err) } if exists && estadoActual == "FINALIZADA" && !strings.EqualFold(evt.Estado, "REPROCESAR") { kc.logger.Info.Printf("[ID:%s] Ya está FINALIZADA, ignorando procesamiento duplicado", id) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) // Enviar mensaje de resultado confirmando estado res := ResultMessage{ TransaccionID: id, Estado: "FINALIZADA", NumeroFactura: numeroFactura, } if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)}, ); err != nil { kc.logger.Error.Printf("[ID:%s] Error enviando resultado de confirmación: %v", id, err) } return nil } // Paso 2: Obtener datos de empresa nit := evt.Payload.Cabecera.NitEmisor codigoSucursal := evt.Payload.Cabecera.CodigoSucursal // nit de tipo 2 ofuscar if evt.Payload.Cabecera.CodigoMetodoPago == 2 { // Aquí iría la lógica de ofuscación si es necesaria } if evt.Payload.Cabecera.CodigoPuntoVenta.Value == nil { kc.logger.Error.Printf("[ID:%s] CodigoPuntoVenta es nulo: %w", id, database.ErrNilPointer) return fmt.Errorf("CodigoPuntoVenta es nulo: %w", database.ErrNilPointer) } codigoPuntoVenta := *evt.Payload.Cabecera.CodigoPuntoVenta.Value datosFactura, err := kc.dbManager.GetDatosFacturaEmpresa(ctx, nit, codigoSucursal, codigoPuntoVenta) if err != nil { kc.logger.Error.Printf("[ID:%s] error obteniendo datos empresa: %w", id, err) return fmt.Errorf("error obteniendo datos empresa: %w", err) } // Paso 3: Obtener CUIS y CUFD _, cufd, codControl, err := kc.dbManager.GetCUISDatosFactura(ctx, datosFactura.RegistroEmpresaID) if err != nil { kc.logger.Error.Printf("[ID:%s] error obteniendo CUIS/CUFD: %w", id, err) return fmt.Errorf("error obteniendo CUIS/CUFD: %w", err) } // Paso 4: Generar CUF dCuf := evt.Payload.Cabecera // Formatear fecha para CUF formatDate, err := utils.FormatFecha(dCuf.FechaEmision) if err != nil { kc.logger.Error.Printf("[ID:%s] error formateando fecha: %w", id, err) return fmt.Errorf("error formateando fecha: %w", err) } vCuf := algoritmo.GenerateCUF( strconv.FormatInt(dCuf.NitEmisor, 10), formatDate, strconv.Itoa(dCuf.CodigoSucursal), strconv.Itoa(datosFactura.CodigoModalidad), strconv.Itoa(dCuf.TipoEmision), strconv.Itoa(dCuf.TipoFactura), strconv.Itoa(dCuf.CodigoDocumentoSector), dCuf.NumeroFactura, strconv.Itoa(*dCuf.CodigoPuntoVenta.Value), codControl, ) // Actualizar CUF y CUFD en la cabecera evt.Payload.Cabecera.Cuf = vCuf evt.Payload.Cabecera.Cufd = cufd // Paso 5: Verificar existencia de la factura y obtener datos // (Ya tenemos estos datos de la verificación anterior) prevEstado := estadoActual // Construir la factura electrónica fact := models.FacturaElectronicaCompraVenta{ Xsi: "http://www.w3.org/2001/XMLSchema-instance", NoNamespaceSchemaLocation: "facturaElectronicaCompraVenta.xsd", Cabecera: evt.Payload.Cabecera, Detalle: evt.Payload.Detalle, } // Serializar a XML con indentación vFactura, err := xml.MarshalIndent(fact, "", " ") if err != nil { kc.logger.Error.Printf("[ID:%s] error al serializar XML: %v", id, err) return fmt.Errorf("error al serializar XML: %v", err) } kc.logger.Info.Printf("[ID:%s] XML factura creado", id) // Si la factura no existe, crearla if !exists { if strings.EqualFold(evt.Estado, "REPROCESAR") { kc.logger.Info.Printf("[ID:%s] factura %s no existe para reprocesar", id) return fmt.Errorf("factura %s no existe para reprocesar", id) } // Valores por defecto para nueva factura numeroFactura = evt.Payload.Cabecera.NumeroFactura fechaEmision = utils.NowInBolivia() prevEstado = "PENDIENTE" vUrlFact, err := utils.ConstruirURLSIAT(numeroFactura, vCuf) if err != nil { kc.logger.Error.Printf("[ID:%s] Error al concatenar la url del siat: %w", id, err) return fmt.Errorf("Error al concatenar la url del siat: %w", err) } // Crear la factura if err := kc.dbManager.CrearFacturaPendiente(ctx, id, fechaEmision, prevEstado, vCuf, vUrlFact, datosFactura.RegistroEmpresaID, evt.Payload.Cabecera.CodigoDocumentoSector, string(vFactura)); err != nil { return fmt.Errorf("error creando factura: %w", err) } } // Paso 6: Procesar según el estado if strings.EqualFold(evt.Estado, "REPROCESAR") { // Revertir la factura if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "REVERTIDA", "", prevEstado, "Factura revertida por solicitud", ""); err != nil { kc.logger.Error.Printf("[ID:%s] error revirtiendo factura: %w", id, err) return fmt.Errorf("error revirtiendo factura: %w", err) } // Enviar mensaje de resultado res := ResultMessage{ TransaccionID: id, Estado: "REVERTIDA", NumeroFactura: numeroFactura, } if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)}, ); err != nil { return fmt.Errorf("error enviando resultado: %w", err) } kc.logger.Info.Printf("[ID:%s] Factura revertida exitosamente", id) return nil } // Paso 7: Generar el XML, firmarlo y enviarlo a SIAT // Serializar a XML xmlBytes, err := xml.MarshalIndent(fact, "", " ") if err != nil { kc.logger.Error.Printf("[ID:%s] error serializando XML: %w", id, err) return fmt.Errorf("error serializando XML: %w", err) } xmlBytes = append([]byte(xml.Header), xmlBytes...) // Rutas de certificados privKeyPath := filepath.Join("internal", "utils", "generadorFirmaDigital", "certificados", datosFactura.NombreArchivoClavePrivada) certPath := filepath.Join("internal", "utils", "generadorFirmaDigital", "certificados", datosFactura.NombreArchivoCertificado) priv, err := generadorFirmaDigital.CargarLlavePrivada(privKeyPath) if err != nil { kc.logger.Error.Printf("[ID:%s] error cargando llave privada: %w", id, err) return fmt.Errorf("error cargando llave privada: %w", err) } //certificado cert, err := generadorFirmaDigital.CargarCertificado(certPath) if err != nil { kc.logger.Error.Printf("[ID:%s] error cargando certificado: %w", id, err) return fmt.Errorf("error cargando certificado: %w", err) } // Firmar XML firmado, err := generadorFirmaDigital.FirmarXML(xmlBytes, priv, cert) if err != nil { kc.logger.Error.Printf("[ID:%s] error firmando XML: %w", id, err) return fmt.Errorf("error firmando XML: %w", err) } // Calcular hash hashArchivo := generadorFirmaDigital.Sha256Hex(firmado) // Comprimir y codificar gz64, err := generadorFirmaDigital.GzipBase64(firmado) if err != nil { kc.logger.Error.Printf("[ID:%s] error comprimiendo XML: %w", id, err) return fmt.Errorf("error comprimiendo XML: %w", err) } // Paso 8: Enviar a SIAT con reintentos - MEJORADO return kc.procesarConReintentosInicial(ctx, evt, id, firmado, gz64, hashArchivo, datosFactura, numeroFactura, prevEstado) } // procesarConReintentosInicial es el punto de entrada para el procesamiento inicial y reintentos // MEJORADO: Evita bucle infinito y solo hace el primer intento func (kc *KafkaConsumer) procesarConReintentosInicial(ctx context.Context, evt models.FacturaEventModel, id string, firmado []byte, gz64 string, hashArchivo string, datosFactura entidad_db.DatosFactura, numeroFactura string, prevEstado string) error { var lastErr error kc.logger.Info.Printf("[ID:%s][Attempt:0] Iniciando intento inicial de envío a SIAT", id) // Solo realizar el intento inicial (intento 0) // Crear un contexto con timeout para la llamada SOAP callCtx, cancel := context.WithTimeout(ctx, soapCallTimeout) soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallService( callCtx, datosFactura, gz64, hashArchivo) cancel() kc.logger.Info.Printf("[ID:%s][Attempt:0] Respuesta SOAP: %s", id, soapResp.CodigoDescripcion) estadoSoap := soapResp.CodigoDescripcion //MensajesList mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList) if jsonErr != nil { kc.logger.Error.Printf("[ID:%s][Attempt:0] Error al obtener el mensaje del servicio SOAP: %v", id, jsonErr) mensajesJSON = []byte("{}") } // IMPORTANTE: Registrar la interacción independientemente del resultado regErr := kc.registrarInteraccion(ctx, id, "RECEPCION_FACTURA", reqBody, respBody, statusCode, dur, err == nil && soapResp.Transaccion, string(mensajesJSON)) if regErr != nil { kc.logger.Error.Printf("[ID:%s][Attempt:0] Error registrando interacción: %v", id, regErr) } else { kc.logger.Info.Printf("[ID:%s][Attempt:0] Interacción registrada", id) } // Manejar errores if err != nil || !soapResp.Transaccion { if err != nil { lastErr = err kc.logger.Error.Printf("[ID:%s][Attempt:0] Error llamando SIAT: %v", id, err) } else { lastErr = fmt.Errorf("SIAT error: %s", soapResp.CodigoDescripcion) kc.logger.Error.Printf("[ID:%s][Attempt:0] SIAT rechazó la transacción: %s", id, soapResp.CodigoDescripcion) } // Programar el primer reintento asíncrono y terminar kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, 1, estadoSoap, numeroFactura, prevEstado) return nil } // Transacción exitosa - actualizar la factura kc.logger.Info.Printf("[ID:%s][Attempt:0] Transacción exitosa, actualizando factura", id) if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "FINALIZADA", "", prevEstado, fmt.Sprintf("Factura finalizada: %v", soapResp.Transaccion), string(firmado)); err != nil { lastErr = fmt.Errorf("error actualizando factura finalizada: %w", err) kc.logger.Error.Printf("[ID:%s][Attempt:0] Error actualizando factura finalizada: %v", id, err) // Programar reintento kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, 1, estadoSoap, numeroFactura, prevEstado) return nil } // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) // Enviar mensaje de resultado exitoso res := ResultMessage{ TransaccionID: id, Estado: "FINALIZADA", NumeroFactura: numeroFactura, CodigoAutorizacion: soapResp.CodigoEstado, } if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)}, ); err != nil { return fmt.Errorf("error enviando resultado: %w", err) } kc.logger.Info.Printf("[ID:%s][Attempt:0] Factura finalizada exitosamente", id) return lastErr } // programa un reintento asíncrono evitando llamadas recursivas func (kc *KafkaConsumer) programarReintento(evt models.FacturaEventModel, id string, firmado []byte, gz64 string, hashArchivo string, datosFactura entidad_db.DatosFactura, attempt int, estadoSoapAnterior string, numeroFactura string, prevEstado string) { kc.logger.Info.Printf("[ID:%s][Attempt:%d] Programando reintento en %v", id, attempt, retryDelay) retryTimersMtx.Lock() // Cancelar timer anterior si existe if oldTimer, exists := retryTimers[id]; exists { oldTimer.Stop() kc.logger.Info.Printf("[ID:%s] Timer anterior cancelado", id) } // Incrementar contador para esperar goroutines durante el cierre kc.shutdownTimers.Add(1) // Crear nuevo timer retryTimers[id] = time.AfterFunc(retryDelay, func() { defer kc.shutdownTimers.Done() // Remover el timer del mapa cuando se ejecuta retryTimersMtx.Lock() delete(retryTimers, id) retryTimersMtx.Unlock() // Crear un nuevo contexto para el reintento newCtx, cancel := context.WithTimeout(context.Background(), workerProcessTimeout) defer cancel() kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ejecutando reintento programado", id, attempt) // Ejecutar el reintento kc.ejecutarReintento(newCtx, evt, id, firmado, gz64, hashArchivo, datosFactura, attempt, estadoSoapAnterior, numeroFactura, prevEstado) }) retryTimersMtx.Unlock() } // maneja un reintento específico con verificaciones mejoradas func (kc *KafkaConsumer) ejecutarReintento(ctx context.Context, evt models.FacturaEventModel, id string, firmado []byte, gz64 string, hashArchivo string, datosFactura entidad_db.DatosFactura, attempt int, estadoSoapAnterior string, numeroFactura string, prevEstado string) { // No necesitamos adquirir el bloqueo global porque cada reintento es independiente // y ya verificamos el estado actual de la factura // IMPORTANTE: Verificar si la factura ya está finalizada antes de ejecutar cualquier reintento exists, _, currentEstado, _, err := kc.dbManager.VerificarExistenciaFactura(ctx, id) if err != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando estado actual: %v", id, attempt, err) } else if exists && currentEstado == "FINALIZADA" { kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ignorando reintento porque ya está FINALIZADA", id, attempt) // Eliminar cualquier timer pendiente kc.cancelarReintentosPendientes(id) return } // IMPORTANTE: Verificar también si la factura ya está en la DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando DLQ: %v", id, attempt, err) } else if inDLQ { kc.logger.Info.Printf("[ID:%s][Attempt:%d] Ignorando reintento porque ya está en DLQ", id, attempt) // Eliminar cualquier timer pendiente kc.cancelarReintentosPendientes(id) return } // IMPORTANTE: Verificar que no excedamos el número máximo de reintentos if attempt > kc.maxRetries { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Se alcanzó el número máximo de reintentos", id, attempt) // Manejar como error final kc.manejarErrorFinal(ctx, evt, id, estadoSoapAnterior, numeroFactura, "Máximo de reintentos alcanzado") return } var lastErr error var estadoSoap string = estadoSoapAnterior // Crear un contexto con timeout para la llamada SOAP callCtx, cancel := context.WithTimeout(ctx, soapCallTimeout) soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallService( callCtx, datosFactura, gz64, hashArchivo) cancel() kc.logger.Info.Printf("[ID:%s][Attempt:%d] Respuesta SOAP: %s", id, attempt, soapResp.CodigoDescripcion) estadoSoap = soapResp.CodigoDescripcion mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList) if jsonErr != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error al obtener el mensaje del servicio SOAP: %v", id, attempt, jsonErr) mensajesJSON = []byte("{}") } // IMPORTANTE: Registrar la interacción en reintentos regErr := kc.registrarInteraccion(ctx, id, "RECEPCION_FACTURA", reqBody, respBody, statusCode, dur, err == nil && soapResp.Transaccion, string(mensajesJSON)) if regErr != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error registrando interacción: %v", id, attempt, regErr) } else { kc.logger.Info.Printf("[ID:%s][Attempt:%d] Interacción registrada", id, attempt) } // Manejar errores if err != nil || !soapResp.Transaccion { if err != nil { lastErr = err kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error llamando SIAT: %v", id, attempt, err) } else { lastErr = fmt.Errorf("SIAT error: %s", soapResp.CodigoDescripcion) kc.logger.Error.Printf("[ID:%s][Attempt:%d] SIAT rechazó la transacción: %s", id, attempt, soapResp.CodigoDescripcion) } // Si quedan reintentos, programar el siguiente if attempt < kc.maxRetries { kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, attempt+1, estadoSoap, numeroFactura, prevEstado) } else { // No quedan reintentos, manejar como error final errMsg := "Máximo de reintentos alcanzado" if lastErr != nil { errMsg = lastErr.Error() } kc.manejarErrorFinal(ctx, evt, id, estadoSoap, numeroFactura, errMsg) } return } // Transacción exitosa kc.logger.Info.Printf("[ID:%s][Attempt:%d] Transacción exitosa, actualizando factura", id, attempt) // Verificar nuevamente si la factura ya fue finalizada (puede haber ocurrido por otro proceso) exists, _, currentEstado, _, err = kc.dbManager.VerificarExistenciaFactura(ctx, id) if err != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error verificando estado actual: %v", id, attempt, err) } else if exists && currentEstado == "FINALIZADA" { kc.logger.Info.Printf("[ID:%s][Attempt:%d] La factura ya fue FINALIZADA por otro proceso", id, attempt) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) return } // Actualizar la factura if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "FINALIZADA", "", prevEstado, fmt.Sprintf("Factura finalizada: %v", soapResp.Transaccion), string(firmado)); err != nil { lastErr = fmt.Errorf("error actualizando factura finalizada: %w", err) kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error actualizando factura finalizada: %v", id, attempt, err) // Si quedan reintentos, programar el siguiente if attempt < kc.maxRetries { kc.programarReintento(evt, id, firmado, gz64, hashArchivo, datosFactura, attempt+1, estadoSoap, numeroFactura, prevEstado) } else { // No quedan reintentos, manejar como error final kc.manejarErrorFinal(ctx, evt, id, estadoSoap, numeroFactura, lastErr.Error()) } return } // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) // Enviar mensaje de resultado exitoso res := ResultMessage{ TransaccionID: id, Estado: "FINALIZADA", NumeroFactura: numeroFactura, CodigoAutorizacion: soapResp.CodigoEstado, } if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{Key: []byte(id), Value: utils.MustMarshal(res)}, ); err != nil { kc.logger.Error.Printf("[ID:%s][Attempt:%d] Error enviando resultado: %v", id, attempt, err) } else { kc.logger.Info.Printf("[ID:%s][Attempt:%d] Factura finalizada exitosamente", id, attempt) } } // manejarErrorFinal gestiona el caso cuando se agotan todos los reintentos func (kc *KafkaConsumer) manejarErrorFinal(ctx context.Context, evt models.FacturaEventModel, id string, estadoSoap string, numeroFactura string, errMsg string) { kc.logger.Error.Printf("[ID:%s] Todos los reintentos agotados: %s", id, errMsg) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) // Crear el mensaje JSON para DLQ msgJson, _ := json.Marshal(evt) // Enviar mensaje de resultado de error if err := kc.resultWriter.WriteMessages(ctx, kafka.Message{ Key: []byte(id), Value: utils.MustMarshal(ResultMessage{ TransaccionID: id, Estado: "ERROR", NumeroFactura: numeroFactura, MensajeError: errMsg, }), }); err != nil { kc.logger.Error.Printf("[ID:%s] Error enviando resultado de error: %v", id, err) } // Registrar en tabla facturacion_dlq if err := kc.dbManager.InsertarEnDLQ(ctx, id, string(msgJson), estadoSoap); err != nil { kc.logger.Error.Printf("[ID:%s] Error insertando en DLQ BD: %v", id, err) } else { kc.logger.Info.Printf("[ID:%s] Registrada en DLQ de base de datos", id) } // Verificar si ya existe en DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { kc.logger.Error.Printf("[ID:%s] Error verificando DLQ: %v", id, err) } else if !inDLQ { if err := kc.dlqWriter.WriteMessages(ctx, kafka.Message{ Key: []byte(id), Value: msgJson, }); err != nil { kc.logger.Error.Printf("[ID:%s] Error enviando a DLQ Kafka: %v", id, err) } else { kc.logger.Info.Printf("[ID:%s] Enviada a DLQ Kafka", id) } } else { kc.logger.Info.Printf("[ID:%s] Ya existente en DLQ, no se enviará nuevamente a Kafka DLQ", id) } // Enviar notificación por correo notifyEmail := os.Getenv("NOTIFY_EMAIL") if notifyEmail != "" { subject := fmt.Sprintf("DLQ: %s", id) body := fmt.Sprintf("ID: %s\nError: %s\nPayload: %s", id, errMsg, string(msgJson)) if err := kc.notifier.SendEmail(notifyEmail, subject, body); err != nil { kc.logger.Error.Printf("[ID:%s] Error enviando notificación por correo: %v", id, err) } } } // procesarEstado procesa un mensaje de estado func (kc *KafkaConsumer) procesarEstado(ctx context.Context, msg KafkaMessage) error { // Validar ID if msg.TransaccionID == "" || !utils.IsValidUUID(msg.TransaccionID) { return database.ErrInvalidUUID } id := msg.TransaccionID // Aplicar bloqueo para evitar procesamiento concurrente del mismo ID if !kc.acquireProcessingLock(id) { kc.logger.Info.Printf("[ID:%s] Estado: Ya está siendo procesada por otro worker, ignorando", id) return nil } defer kc.releaseProcessingLock(id) // Verificar si ya fue procesada y enviada a DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { return fmt.Errorf("error verificando DLQ: %w", err) } if inDLQ { kc.logger.Info.Printf("[ID:%s] Estado: Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) return database.ErrAlreadyProcessed } // Obtener estado anterior _, _, prevEstado, _, err := kc.dbManager.VerificarExistenciaFactura(ctx, id) if err != nil { return fmt.Errorf("error obteniendo factura: %w", err) } // Actualizar estado if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, msg.Estado, "", prevEstado, "Estado recibido de FACTURACION_ESTADO", ""); err != nil { return fmt.Errorf("error actualizando estado: %w", err) } kc.logger.Info.Printf("[ID:%s] Estado: %s → %s", id, prevEstado, msg.Estado) return nil } // procesa un mensaje del topic de facturación func (kc *KafkaConsumer) processFacturaMessage(ctx context.Context, m kafka.Message) error { // Verificar ID en el mensaje id := string(m.Key) if id == "" { id = utils.GenerateUUID() kc.logger.Info.Printf("Generado nuevo ID para mensaje sin key: %s", id) } kc.logger.Info.Printf("[ID:%s] Procesando mensaje de facturación", id) // Deserializar mensaje var evt models.FacturaEventModel if err := json.Unmarshal(m.Value, &evt); err != nil { kc.logger.Error.Printf("[ID:%s] Error deserializando mensaje: %v", id, err) return fmt.Errorf("%w: %v", database.ErrInvalidMessage, err) } // Validar ID if evt.TransaccionID == "" || !utils.IsValidUUID(evt.TransaccionID) { kc.logger.Error.Printf("[ID:%s] ID de transacción inválido: %s", id, evt.TransaccionID) return database.ErrInvalidUUID } // Procesar la factura return kc.procesarFactura(ctx, evt) } // Modificar el método para integrar verificación del estado en SIAT func (kc *KafkaConsumer) handleEstadoMessage(ctx context.Context, m kafka.Message) error { // Verificar ID en el mensaje id := string(m.Key) if id == "" { kc.logger.Error.Printf("Mensaje de estado sin ID, ignorando") return errors.New("mensaje de estado sin ID") } kc.logger.Info.Printf("[ID:%s] Procesando mensaje de estado", id) // Aplicar bloqueo para evitar procesamiento concurrente del mismo ID if !kc.acquireProcessingLock(id) { kc.logger.Info.Printf("[ID:%s] Estado: Ya está siendo procesada por otro worker, ignorando", id) return nil } defer kc.releaseProcessingLock(id) vRegCuf, idRegCompras, prevEstado, err := kc.dbManager.GetFacturacionFacturas(ctx, id) if err != nil { return fmt.Errorf("Error obteniendo datos de la tabla RegistroEmpresa: %w", err) } // Paso 2: Obtener datos de las tablas RegistroEmpresa - cuf - cufd datosFactura, err := kc.dbManager.GetRegEmpresaCuisCuf(ctx, idRegCompras) if err != nil { return fmt.Errorf("error obteniendo datos empresa: %w", err) } // Verificar si ya fue procesada y enviada a DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { return fmt.Errorf("error verificando DLQ: %w", err) } if inDLQ { kc.logger.Info.Printf("[ID:%s] Estado: Ya fue procesada anteriormente y enviada a DLQ, omitiendo", id) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) return database.ErrAlreadyProcessed } // Llamar al servicio SOAP de verificación de estado callCtx, callCancel := context.WithTimeout(ctx, soapCallTimeout) defer callCancel() soapResp, statusCode, reqBody, respBody, dur, err := kc.soapClient.CallVerificacionEstadoFactura( callCtx, datosFactura, vRegCuf) kc.logger.Info.Printf("[ID:%s] Estado: Respuesta verificación SIAT: %+v", id, soapResp) //MensajesList verificarEstadoSoap mensajesJSON, jsonErr := json.Marshal(soapResp.MensajesList) if jsonErr != nil { kc.logger.Error.Printf("[ID:%s] Estado: Error al obtener el mensaje del servicio SOAP: %v", id, jsonErr) return fmt.Errorf("Error al obtener el mensaje del servicio SOAP: %w", jsonErr) } // Registrar la interacción con el servicio usando el método helper regErr := kc.registrarInteraccion(ctx, id, "VERIFICACION_ESTADO_FACTURA", reqBody, respBody, statusCode, dur, err == nil, string(mensajesJSON)) if regErr != nil { kc.logger.Error.Printf("[ID:%s] Estado: Error registrando interacción: %v", id, regErr) } if err != nil { kc.logger.Error.Printf("[ID:%s] Estado: Error llamando al servicio de verificación: %v", id, err) // Si falla la verificación, continuamos con la actualización de estado solicitada kc.logger.Info.Printf("[ID:%s] Estado: Continuando con actualización de estado solicitada a pesar del error en SIAT", id) // Actualizar estado con el mensaje recibido if err := kc.dbManager.ActualizarEstadoFactura(ctx, id, "PENDIENTE", "", prevEstado, "Estado recibido de FACTURACION_ESTADO - Verificación SIAT falló", ""); err != nil { return fmt.Errorf("error actualizando estado: %w", err) } kc.logger.Info.Printf("[ID:%s] Estado: %s → %s (Verificación SIAT falló)", id, prevEstado, "PENDIENTE") return nil } return nil } // handleAnularMessage processes a message from the FACTURACION_ANULAR topic func (kc *KafkaConsumer) handleAnularMessage(ctx context.Context, m kafka.Message) error { // Check ID in the message id := string(m.Key) if id == "" { kc.logger.Error.Printf("Anulación message without ID, ignoring") return errors.New("anulación message without ID") } kc.logger.Info.Printf("[ID:%s] Processing invoice cancellation message", id) // Aplicar bloqueo para evitar procesamiento concurrente del mismo ID if !kc.acquireProcessingLock(id) { kc.logger.Info.Printf("[ID:%s] Anulación: Ya está siendo procesada por otro worker, ignorando", id) return nil } defer kc.releaseProcessingLock(id) // Check if already processed and sent to DLQ inDLQ, err := kc.dbManager.IsFacturaInDLQ(ctx, id) if err != nil { return fmt.Errorf("error checking DLQ: %w", err) } if inDLQ { kc.logger.Info.Printf("[ID:%s] Anulación: Invoice already processed and sent to DLQ, skipping", id) // Cancelar cualquier reintento pendiente kc.cancelarReintentosPendientes(id) return database.ErrAlreadyProcessed } // Get the CUF, company registration ID and previous state vRegCuf, idRegCompras, prevEstado, err := kc.dbManager.GetFacturacionFacturas(ctx, id) if err != nil { return fmt.Errorf("error getting data from RegistroEmpresa table: %w", err) } // Verify invoice exists and is in FINALIZADA state if prevEstado != "FINALIZADA" { kc.logger.Error.Printf("[ID:%s] Anulación: Invoice cannot be cancelled - current state: %s", id, prevEstado) return fmt.Errorf("invoice must be in FINALIZADA state to be cancelled, current state: %s", prevEstado) } // Get company data, CUIS and CUFD datosFactura, err := kc.dbManager.GetRegEmpresaCuisCuf(ctx, idRegCompras) if err != nil { return fmt.Errorf("error getting company data: %w", err) } // Parse the cancellation reason code from the message (default to 1 if not provided) var codigoMotivo int = 1 var anulacionData struct { CodigoMotivo int `json:"codigo_motivo"` } if err := json.Unmarshal(m.Value, &anulacionData); err == nil && anulacionData.CodigoMotivo > 0 { codigoMotivo = anulacionData.CodigoMotivo } // Call the SOAP service with retries var lastErr error for attempt := 0; attempt <= kc.maxRetries; attempt++ { if attempt > 0 { kc.logger.Info.Printf("[ID:%s] Anulación: Retry %d for cancellation", id, attempt) // Usar select con context para asegurar que no se bloquee durante el shutdown select { case <-time.After(time.Duration(2<