VUE代码

        // 初始化EventSource
        initEventSource(url) {
          const token = getAccessToken();
          const eventSource = new EventSourcePolyfill(url, {
            headers: {
              'Authorization': `Bearer ${token}`,
              'tenant-id': getTenantId(),
            }
          });

          eventSource.onerror = (e) => {
            console.log("SSE连接错误", e);
            if (e.readyState === EventSource.CLOSED || eventSource.readyState === EventSource.CONNECTING) {
              console.log("SSE连接已关闭或正在重连");
            } else {
              // 当发生错误时,尝试重新连接
              if (reconnectAttempts < maxReconnectAttempts) {
                console.log(`尝试第${reconnectAttempts + 1}次重连`);
                reconnectAttempts++;
                setTimeout(() => {
                  eventSource.close(); // 关闭当前连接
                  this.initEventSource(url); // 重新初始化EventSource
                }, reconnectDelay * reconnectAttempts);
              } else {
                console.error("达到最大重连次数,不再尝试重连");
              }
            }
          };

          eventSource.addEventListener("message", res => {
            const data = JSON.parse(res.data)
            if (data.type == 2) {
              this.unreadCount = data.content;
            }
            if (data.type == 1) {
              this.createNotify(data)
            }
          })
        },

后端采用redis做管道,能够兼容分布式服务

JAVA 监听接口

 @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamEvents() {
        Long loginUserId = SecurityFrameworkUtils.getLoginUserId();
        USER_IDS.add(loginUserId);
        SseMessageVO heartbeat = new SseMessageVO()
                .setType(SseNotifyTypeEnum.HEARTBEAT.getType())
                .setUserId(loginUserId)
                .setContent("Heartbeat");
        return  reactiveRedisOperations.listenToChannel(SseService.getDestination(loginUserId))
                .map(data -> sendMsg(loginUserId,data.getMessage(),heartbeat))
                .publishOn(Schedulers.boundedElastic())
                .doOnSubscribe(subscription -> {
                    // 订阅时发送一次心跳,确认连接
                    heartbeat(heartbeat);
                });
    }

    private String sendMsg(Long loginUserId,String sseMessage,SseMessageVO heartbeat){
        SseMessageVO sseMessageVO = JSONUtil.toBean(sseMessage, SseMessageVO.class);
        if (null != sseMessageVO && Objects.equals(sseMessageVO.getUserId(), loginUserId)){
            return JSONUtil.toJsonStr(sseMessageVO);
        }
        return JSONUtil.toJsonStr(heartbeat);
    }

    /**
     * 登录时心跳
     */
    private void heartbeat(SseMessageVO heartbeat) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.schedule(() -> {
            sseService.publishEventToChannel(heartbeat).subscribe();
        }, 1, TimeUnit.SECONDS);
        executorService.shutdown();
    }

    /**
     * 保活
     */
    @PostConstruct
    public void heartbeatTimer() {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            if (CollectionUtil.isNotEmpty(USER_IDS)){
                for (Long userId : USER_IDS) {
                    String message = "Heartbeat at " + LocalDateTime.now();
                    SseMessageVO heartbeat = new SseMessageVO()
                            .setType(SseNotifyTypeEnum.HEARTBEAT.getType())
                            .setUserId(userId)
                            .setContent(message);
                    sseService.publishEventToChannel(heartbeat).subscribe();
                }
            }
        }, 0, 10, TimeUnit.SECONDS);
    }

JAVA 提交数据服务

@Component
public class SseService {

    @Resource
    private ReactiveRedisOperations<String, String> reactiveRedisOperations;

    private static final String DESTINATION = "event-channel-user:";

    /**
     * 获取指定通道
     * @param userId
     * @return
     */
    public static String getDestination(Long userId){
        return DESTINATION+userId;
    }

    /**
     * 推送事件到通道
     * @param sseMessageVO
     * @return
     */
    public Mono<Long> publishEventToChannel(SseMessageVO sseMessageVO) {
        return reactiveRedisOperations.convertAndSend(getDestination(sseMessageVO.getUserId()), JSONUtil.toJsonStr(sseMessageVO));
    }

}

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐