package com.example.fercoganbackend.controller; import com.example.fercoganbackend.entity.Lote; import com.example.fercoganbackend.service.LoteServiceWebFlux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import java.util.List; import java.util.Optional; @RestController @RequestMapping("/api/webflux/lotes") public class LoteControllerWebFlux { @Autowired private LoteServiceWebFlux loteService; @GetMapping public java.util.List getAll() { return loteService.findAll(); } @GetMapping("/{id}") public ResponseEntity getById(@PathVariable Long id) { return loteService.findById(id) .map(ResponseEntity::ok) .orElse(ResponseEntity.notFound().build()); } @PostMapping public Lote create(@RequestBody Lote remate) { return loteService.save(remate); } @PutMapping("/{id}") public ResponseEntity update(@PathVariable Long id, @RequestBody Lote remate) { return loteService.findById(id) .map(r -> { remate.setId(id); Lote updated = loteService.save(remate); return ResponseEntity.ok(updated); }) .orElse(ResponseEntity.notFound().build()); } @DeleteMapping("/{id}") public ResponseEntity delete(@PathVariable Long id) { loteService.delete(id); return ResponseEntity.noContent().build(); } // -------------------- NUEVO: SSE por LOTE -------------------- // Endpoint que devuelve el flujo de actualizaciones solo para ese lote (id). @GetMapping(value = "/stream/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux streamLoteById(@PathVariable Long id) { // flujo de futuras actualizaciones (todos los lotes, luego filtramos por id) Flux updates = loteService.getSink().asFlux() .filter(l -> l != null && l.getId() != null && l.getId().equals(id)); // enviamos primero el estado actual (si existe), luego las actualizaciones Optional current = loteService.findById(id); if (current.isPresent()) { return Flux.concat(Flux.just(current.get()), updates); } else { // si no existe ahora, devolvemos solo futuras actualizaciones (por ejemplo creación posterior) return updates; } } @GetMapping(value = "/stream/cabanaid/{cabanaId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux streamLote(@PathVariable Long cabanaId) { // Flujo de futuras actualizaciones (todos los lotes, luego filtramos por cabanaId) Flux updates = loteService.getSink().asFlux() .filter(lote -> lote.getCabana().getId().equals(cabanaId)); // Enviamos primero el estado actual (todos los lotes de la cabaña), luego las actualizaciones List currentLotes = loteService.findByCabanaId(cabanaId); if (!currentLotes.isEmpty()) { return Flux.concat(Flux.fromIterable(currentLotes), updates); } else { // Si no existen lotes ahora, devolvemos solo futuras actualizaciones return updates; } } }