2025-10-29 06:14:57 -04:00

208 lines
6.2 KiB
Go

package fetcher
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/tuusuario/go-sync-service/internal/config"
"github.com/tuusuario/go-sync-service/internal/db"
"github.com/tuusuario/go-sync-service/internal/domain/dto"
"github.com/tuusuario/go-sync-service/internal/domain/ports"
"github.com/tuusuario/go-sync-service/internal/http"
"github.com/tuusuario/go-sync-service/internal/utils"
"github.com/tuusuario/go-sync-service/metrics"
"gorm.io/gorm"
)
func SyncData(redis ports.RedisConfigProvider, database *gorm.DB, job dto.CronJob) {
start := time.Now()
logPrefix := fmt.Sprintf("[🧩 Job: %s] ", job.Nombre)
config.Log.Printf("%s Iniciando sincronización...", logPrefix)
var dbcore ports.Database = db.NewGormDatabase(database)
var hasError bool
http.InitClient()
if len(job.Configuracion.Proceso) == 0 {
config.Log.Printf(" %s ⚠️ No hay procesos configurados para este job", logPrefix)
goto END
}
for _, proceso := range job.Configuracion.Proceso {
config.Log.Printf(logPrefix+" Iniciando proceso %s", proceso)
jobIndividual, err := utils.CargarDesdeRedis[dto.JobConfig](redis, proceso)
if err != nil {
config.Log.Printf(logPrefix+" ❌ Error al obtener configuración del proceso: %v", err)
hasError = true
continue
}
//Obtener Session
session, err := http.GetSession(redis, job, jobIndividual.Auth, dbcore, logPrefix)
if err != nil {
config.Log.Println(logPrefix + " ❌ No se pudo obtener sesión")
hasError = true
continue
}
if session == nil || session.Headers == nil {
config.Log.Println(logPrefix + " ❌ Sesión inválida o vacía")
hasError = true
continue
}
jobIndividual.Service.Headers = session.Headers
if jobIndividual.Service.GQL {
jobIndividual.Service.Headers["origin"] = config.GlobalConfig.TM_HEADER_ORIGIN
jobIndividual.Service.Headers["tenant-name"] = config.GlobalConfig.TM_HEADER_TENANT_NAME
jobIndividual.Service.Headers["User-Agent"] = config.GlobalConfig.TM_HEADER_USER_AGENT
}
response, err := FetchAllPaginatedManual[map[string]interface{}](session.EndPoint, jobIndividual.Service, logPrefix)
if err != nil {
config.Log.Printf(logPrefix+" ❌ Error al obtener data: %v", err)
hasError = true
continue
}
config.Log.Printf("%s Cantidad de elementos: %v", logPrefix, len(*response))
err = dbcore.SyncRows(jobIndividual.Persistencia, response, job.UnidadNegocio.CompanyDB)
if err != nil {
config.Log.Printf(logPrefix+" ❌ Error al guardar en base de datos: %v", err)
hasError = true
}
}
END:
duration := time.Since(start).Seconds()
jobName := job.Nombre
metrics.CronDuration.WithLabelValues(jobName).Observe(duration)
if hasError {
metrics.CronError.WithLabelValues(jobName).Inc()
metrics.CronLastError.WithLabelValues(jobName).Set(float64(time.Now().Unix()))
} else {
metrics.CronSuccess.WithLabelValues(jobName).Inc()
metrics.CronLastSuccess.WithLabelValues(jobName).Set(float64(time.Now().Unix()))
}
config.Log.Printf("%s ⏱ Duración total: %.2fs", logPrefix, duration)
}
func FetchAllPaginatedManual[T any](host string, service dto.ServiceConfig, logPrefix string) (*[]T, error) {
var all []T
// REST paginación
if service.Rest != nil && service.Rest.Pagination != nil && service.Rest.Pagination.Enabled {
skip := service.Rest.Pagination.Skip
top := service.Rest.Pagination.Top
for {
// Actualizar parámetros de paginación
service.Rest.Query["$skip"] = strconv.Itoa(skip)
service.Rest.Query["$top"] = strconv.Itoa(top)
resp, err := http.SendRequest(host, service)
if err != nil {
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
}
var result struct {
Value []T `json:"value"`
}
if err := json.Unmarshal(resp.Body(), &result); err != nil {
return nil, fmt.Errorf("%s ❌ error parseando respuesta: %w", logPrefix, err)
}
if len(result.Value) == 0 {
config.Log.Printf(" %s ❌ No hay más elementos", logPrefix)
break
}
all = append(all, result.Value...)
skip += top
}
return &all, nil
}
// GraphQL paginación
if service.GraphQL != nil && service.GraphQL.Pagination != nil && service.GraphQL.Pagination.Enabled {
for {
resp, err := http.SendRequest(host, service)
if err != nil {
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
}
var raw map[string]any
if err := json.Unmarshal(resp.Body(), &raw); err != nil {
return nil, fmt.Errorf("%s ❌ error parseando respuesta GraphQL: %w", logPrefix, err)
}
data, ok := raw["data"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("%s ❌ no se encontró 'data' en la respuesta GraphQL", logPrefix)
}
root, ok := data[service.GraphQL.RootField].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RootField)
}
// Obtener y parsear filas
rows, ok := root[service.GraphQL.RowField].([]interface{})
if !ok {
return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RowField)
}
for _, r := range rows {
jsonRow, _ := json.Marshal(r)
var item T
if err := json.Unmarshal(jsonRow, &item); err != nil {
config.Log.Printf("%s ⚠️ error parseando fila: %v", logPrefix, err)
continue
}
all = append(all, item)
}
// Evaluar paginación
meta, ok := root["meta"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("%s ❌ no se encontró 'meta' para paginación", logPrefix)
}
hasNext, ok := meta["hasNextPage"].(bool)
if !ok || !hasNext {
break
}
// Avanzar cursor (página)
if nextPage, ok := meta["next"]; ok {
service.GraphQL.Variables[service.GraphQL.Pagination.CursorParam] = nextPage
} else {
break // no hay campo next
}
}
return &all, nil
}
// Sin paginación
resp, err := http.SendRequest(host, service)
if err != nil {
return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err)
}
var result struct {
Value []T `json:"value"`
}
if err := json.Unmarshal(resp.Body(), &result); err != nil {
return nil, fmt.Errorf("%s ❌ error parseando respuesta final: %w", logPrefix, err)
}
all = append(all, result.Value...)
return &all, nil
}