package fetcher import ( "encoding/json" "fmt" "strconv" "time" "github.com/tuusuario/go-sync-service/internal/config" "github.com/tuusuario/go-sync-service/internal/db" "github.com/tuusuario/go-sync-service/internal/domain/dto" "github.com/tuusuario/go-sync-service/internal/domain/ports" "github.com/tuusuario/go-sync-service/internal/http" "github.com/tuusuario/go-sync-service/internal/utils" "github.com/tuusuario/go-sync-service/metrics" "gorm.io/gorm" ) func SyncData(redis ports.RedisConfigProvider, database *gorm.DB, job dto.CronJob) { start := time.Now() logPrefix := fmt.Sprintf("[🧩 Job: %s] ", job.Nombre) config.Log.Printf("%s Iniciando sincronización...", logPrefix) var dbcore ports.Database = db.NewGormDatabase(database) var hasError bool http.InitClient() if len(job.Configuracion.Proceso) == 0 { config.Log.Printf(" %s ⚠️ No hay procesos configurados para este job", logPrefix) goto END } for _, proceso := range job.Configuracion.Proceso { config.Log.Printf(logPrefix+" Iniciando proceso %s", proceso) jobIndividual, err := utils.CargarDesdeRedis[dto.JobConfig](redis, proceso) if err != nil { config.Log.Printf(logPrefix+" ❌ Error al obtener configuración del proceso: %v", err) hasError = true continue } //Obtener Session session, err := http.GetSession(redis, job, jobIndividual.Auth, dbcore, logPrefix) if err != nil { config.Log.Println(logPrefix + " ❌ No se pudo obtener sesión") hasError = true continue } if session == nil || session.SessionId == "" { config.Log.Println(logPrefix + " ❌ Sesión inválida o vacía") hasError = true continue } if jobIndividual.Service.GQL { jobIndividual.Service.Headers["Authorization"] = "Bearer " + session.SessionId } else { jobIndividual.Service.Headers["Cookie"] = "B1SESSION=" + session.SessionId } response, err := FetchAllPaginatedManual[map[string]interface{}](session.EndPoint, jobIndividual.Service, logPrefix) if err != nil { config.Log.Printf(logPrefix+" ❌ Error al obtener data: %v", err) hasError = true continue } config.Log.Printf("%s Cantidad de elementos: %v", logPrefix, len(*response)) err = dbcore.SyncRows(jobIndividual.Persistencia, response, job.UnidadNegocio.CompanyDB) if err != nil { config.Log.Printf(logPrefix+" ❌ Error al guardar en base de datos: %v", err) hasError = true } } END: duration := time.Since(start).Seconds() jobName := job.Nombre metrics.CronDuration.WithLabelValues(jobName).Observe(duration) if hasError { metrics.CronError.WithLabelValues(jobName).Inc() metrics.CronLastError.WithLabelValues(jobName).Set(float64(time.Now().Unix())) } else { metrics.CronSuccess.WithLabelValues(jobName).Inc() metrics.CronLastSuccess.WithLabelValues(jobName).Set(float64(time.Now().Unix())) } config.Log.Printf("%s ⏱ Duración total: %.2fs", logPrefix, duration) } func FetchAllPaginatedManual[T any](host string, service dto.ServiceConfig, logPrefix string) (*[]T, error) { var all []T // REST paginación if service.Rest != nil && service.Rest.Pagination != nil && service.Rest.Pagination.Enabled { skip := service.Rest.Pagination.Skip top := service.Rest.Pagination.Top for { // Actualizar parámetros de paginación service.Rest.Query["$skip"] = strconv.Itoa(skip) service.Rest.Query["$top"] = strconv.Itoa(top) resp, err := http.SendRequest(host, service) if err != nil { return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err) } var result struct { Value []T `json:"value"` } if err := json.Unmarshal(resp.Body(), &result); err != nil { return nil, fmt.Errorf("%s ❌ error parseando respuesta: %w", logPrefix, err) } if len(result.Value) == 0 { config.Log.Printf(" %s ❌ No hay más elementos", logPrefix) break } all = append(all, result.Value...) skip += top } return &all, nil } // GraphQL paginación if service.GraphQL != nil && service.GraphQL.Pagination != nil && service.GraphQL.Pagination.Enabled { for { resp, err := http.SendRequest(host, service) if err != nil { return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err) } var raw map[string]any if err := json.Unmarshal(resp.Body(), &raw); err != nil { return nil, fmt.Errorf("%s ❌ error parseando respuesta GraphQL: %w", logPrefix, err) } data, ok := raw["data"].(map[string]interface{}) if !ok { return nil, fmt.Errorf("%s ❌ no se encontró 'data' en la respuesta GraphQL", logPrefix) } root, ok := data[service.GraphQL.RootField].(map[string]interface{}) if !ok { return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RootField) } // Obtener y parsear filas rows, ok := root[service.GraphQL.RowField].([]interface{}) if !ok { return nil, fmt.Errorf("%s ❌ no se encontró '%v' en la respuesta GraphQL", logPrefix, service.GraphQL.RowField) } for _, r := range rows { jsonRow, _ := json.Marshal(r) var item T if err := json.Unmarshal(jsonRow, &item); err != nil { config.Log.Printf("%s ⚠️ error parseando fila: %v", logPrefix, err) continue } all = append(all, item) } // Evaluar paginación meta, ok := root["meta"].(map[string]interface{}) if !ok { return nil, fmt.Errorf("%s ❌ no se encontró 'meta' para paginación", logPrefix) } hasNext, ok := meta["hasNextPage"].(bool) if !ok || !hasNext { break } // Avanzar cursor (página) if nextPage, ok := meta["next"]; ok { service.GraphQL.Variables[service.GraphQL.Pagination.CursorParam] = nextPage } else { break // no hay campo next } } return &all, nil } // Sin paginación resp, err := http.SendRequest(host, service) if err != nil { return nil, fmt.Errorf("%s ❌ error en la petición: %w", logPrefix, err) } var result struct { Value []T `json:"value"` } if err := json.Unmarshal(resp.Body(), &result); err != nil { return nil, fmt.Errorf("%s ❌ error parseando respuesta final: %w", logPrefix, err) } all = append(all, result.Value...) return &all, nil }