219 lines
7.1 KiB
Go
219 lines
7.1 KiB
Go
package fetcher
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/tuusuario/go-sync-service/internal/email"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tuusuario/go-sync-service/internal/config"
|
|
"github.com/tuusuario/go-sync-service/internal/db"
|
|
"github.com/tuusuario/go-sync-service/internal/domain/dto"
|
|
"github.com/tuusuario/go-sync-service/internal/domain/ports"
|
|
"github.com/tuusuario/go-sync-service/internal/http"
|
|
"github.com/tuusuario/go-sync-service/internal/utils"
|
|
"github.com/tuusuario/go-sync-service/metrics"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
func SyncData(redis ports.RedisConfigProvider, database *gorm.DB, job dto.CronJob, email email.SMTPEmailService) {
|
|
start := time.Now()
|
|
logPrefix := fmt.Sprintf("[🧩 Job: %s] ", job.Nombre)
|
|
config.Log.Printf("%s Iniciando sincronización...", logPrefix)
|
|
|
|
var dbcore ports.Database = db.NewGormDatabase(database)
|
|
var hasError bool
|
|
var errorMessages []string // Collects all error messages
|
|
|
|
http.InitClient()
|
|
|
|
if len(job.Configuracion.Proceso) == 0 {
|
|
config.Log.Printf(" %s ⚠️ No hay procesos configurados para este job", logPrefix)
|
|
goto END
|
|
}
|
|
|
|
for _, proceso := range job.Configuracion.Proceso {
|
|
config.Log.Printf(logPrefix+" Iniciando proceso %s", proceso)
|
|
|
|
jobIndividual, err := utils.CargarDesdeRedis[dto.JobConfig](redis, proceso)
|
|
if err != nil {
|
|
config.Log.Printf(logPrefix+" ❌ Error al obtener configuración del proceso: %v", err)
|
|
errorMessages = append(errorMessages, fmt.Sprintf("Error en proceso %s: %v", proceso, err))
|
|
hasError = true
|
|
continue
|
|
}
|
|
//Obtener Session
|
|
session, err := http.GetSession(redis, job, jobIndividual.Auth, dbcore, logPrefix)
|
|
if err != nil {
|
|
config.Log.Println(logPrefix + " ❌ No se pudo obtener sesión")
|
|
errorMessages = append(errorMessages, fmt.Sprintf("❌ No se pudo obtener sesión para proceso %s: %v", proceso, err))
|
|
hasError = true
|
|
continue
|
|
}
|
|
if session == nil || session.Headers == nil {
|
|
config.Log.Println(logPrefix + " ❌ Sesión inválida o vacía")
|
|
errorMessages = append(errorMessages, fmt.Sprintf("❌ Sesión inválida o vacía para proceso %s", proceso))
|
|
hasError = true
|
|
continue
|
|
}
|
|
|
|
jobIndividual.Service.Headers = session.Headers
|
|
if jobIndividual.Service.GQL {
|
|
jobIndividual.Service.Headers["origin"] = config.GlobalConfig.TM_HEADER_ORIGIN
|
|
jobIndividual.Service.Headers["tenant-name"] = config.GlobalConfig.TM_HEADER_TENANT_NAME
|
|
jobIndividual.Service.Headers["User-Agent"] = config.GlobalConfig.TM_HEADER_USER_AGENT
|
|
}
|
|
|
|
response, err := FetchAllPaginatedManual[map[string]interface{}](session.EndPoint, jobIndividual.Service, logPrefix)
|
|
if err != nil {
|
|
config.Log.Printf(logPrefix+" ❌ Error al obtener data: %v", err)
|
|
errorMessages = append(errorMessages, fmt.Sprintf("❌ Error al obtener data para proceso %s: %v", proceso, err))
|
|
hasError = true
|
|
continue
|
|
}
|
|
|
|
config.Log.Printf("%s Cantidad de elementos: %v", logPrefix, len(*response))
|
|
|
|
err = dbcore.SyncRows(jobIndividual.Persistencia, response, job.UnidadNegocio.CompanyDB)
|
|
if err != nil {
|
|
config.Log.Printf(logPrefix+" ❌ Error al guardar en base de datos: %v", err)
|
|
errorMessages = append(errorMessages, fmt.Sprintf("❌ Error al guardar en base de datos para proceso %s: %v", proceso, err))
|
|
hasError = true
|
|
}
|
|
}
|
|
|
|
END:
|
|
duration := time.Since(start).Seconds()
|
|
jobName := job.Nombre
|
|
|
|
metrics.CronDuration.WithLabelValues(jobName).Observe(duration)
|
|
|
|
if hasError {
|
|
errMessage := strings.Join(errorMessages, "\n") // Combine all errors into one message
|
|
_ = email.PrepareEmail("ERROR AL EJECUTAR EL JOB: "+job.Nombre, errMessage, redis)
|
|
|
|
metrics.CronError.WithLabelValues(jobName).Inc()
|
|
metrics.CronLastError.WithLabelValues(jobName).Set(float64(time.Now().Unix()))
|
|
} else {
|
|
metrics.CronSuccess.WithLabelValues(jobName).Inc()
|
|
metrics.CronLastSuccess.WithLabelValues(jobName).Set(float64(time.Now().Unix()))
|
|
}
|
|
|
|
config.Log.Printf("%s ⏱ Duración total: %.2fs", logPrefix, duration)
|
|
}
|
|
|
|
func FetchAllPaginatedManual[T any](host string, service dto.ServiceConfig, logPrefix string) (*[]T, error) {
|
|
var all []T
|
|
|
|
// REST paginación
|
|
if service.Rest != nil && service.Rest.Pagination != nil && service.Rest.Pagination.Enabled {
|
|
skip := service.Rest.Pagination.Skip
|
|
top := service.Rest.Pagination.Top
|
|
|
|
for {
|
|
// Actualizar parámetros de paginación
|
|
service.Rest.Query["$skip"] = strconv.Itoa(skip)
|
|
service.Rest.Query["$top"] = strconv.Itoa(top)
|
|
|
|
resp, err := http.SendRequest(host, service)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
|
|
}
|
|
|
|
var result struct {
|
|
Value []T `json:"value"`
|
|
}
|
|
if err := json.Unmarshal(resp.Body(), &result); err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error parseando respuesta: %w", logPrefix, err)
|
|
}
|
|
|
|
if len(result.Value) == 0 {
|
|
config.Log.Printf(" %s ❌ No hay más elementos", logPrefix)
|
|
break
|
|
}
|
|
|
|
all = append(all, result.Value...)
|
|
skip += top
|
|
}
|
|
return &all, nil
|
|
}
|
|
|
|
// GraphQL paginación
|
|
if service.GraphQL != nil && service.GraphQL.Pagination != nil && service.GraphQL.Pagination.Enabled {
|
|
for {
|
|
resp, err := http.SendRequest(host, service)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
|
|
}
|
|
|
|
var raw map[string]any
|
|
if err := json.Unmarshal(resp.Body(), &raw); err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error parseando respuesta GraphQL: %w", logPrefix, err)
|
|
}
|
|
|
|
data, ok := raw["data"].(map[string]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s ❌ no se encontró 'data' en la respuesta GraphQL", logPrefix)
|
|
}
|
|
|
|
root, ok := data[service.GraphQL.RootField].(map[string]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RootField)
|
|
}
|
|
|
|
// Obtener y parsear filas
|
|
rows, ok := root[service.GraphQL.RowField].([]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RowField)
|
|
}
|
|
|
|
for _, r := range rows {
|
|
jsonRow, _ := json.Marshal(r)
|
|
|
|
var item T
|
|
if err := json.Unmarshal(jsonRow, &item); err != nil {
|
|
config.Log.Printf("%s ⚠️ error parseando fila: %v", logPrefix, err)
|
|
continue
|
|
}
|
|
all = append(all, item)
|
|
}
|
|
|
|
// Evaluar paginación
|
|
meta, ok := root["meta"].(map[string]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s ❌ no se encontró 'meta' para paginación", logPrefix)
|
|
}
|
|
|
|
hasNext, ok := meta["hasNextPage"].(bool)
|
|
if !ok || !hasNext {
|
|
break
|
|
}
|
|
|
|
// Avanzar cursor (página)
|
|
if nextPage, ok := meta["next"]; ok {
|
|
service.GraphQL.Variables[service.GraphQL.Pagination.CursorParam] = nextPage
|
|
} else {
|
|
break // no hay campo next
|
|
}
|
|
}
|
|
return &all, nil
|
|
}
|
|
|
|
// Sin paginación
|
|
resp, err := http.SendRequest(host, service)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
|
|
}
|
|
var result struct {
|
|
Value []T `json:"value"`
|
|
}
|
|
if err := json.Unmarshal(resp.Body(), &result); err != nil {
|
|
return nil, fmt.Errorf("%s ❌ error parseando respuesta final: %w", logPrefix, err)
|
|
}
|
|
all = append(all, result.Value...)
|
|
|
|
return &all, nil
|
|
}
|