使用SSE实现网易云音乐同步进度实时更新

在开发橘子音乐应用的过程中,我们实现了网易云音乐歌单同步功能,为了提供更好的用户体验,需要实时显示同步进度。本文将详细介绍如何使用Server-Sent Events (SSE) 技术实现服务器向客户端推送实时进度更新。

什么是SSE?

Server-Sent Events (SSE) 是一种服务器推送技术,允许服务器通过HTTP连接向客户端发送事件流。与WebSocket不同,SSE是单向通信(仅从服务器到客户端),专门用于服务器主动推送数据给客户端的场景。

SSE具有以下特点:

  • 基于HTTP协议,无需额外的协议支持
  • 自动重连机制
  • 简单易用,浏览器原生支持(通过EventSource API)
  • 轻量级,适合服务器到客户端的单向数据流

实现架构

在橘子音乐项目中,我们的SSE实现架构如下:

  1. 后端服务(Java)

    • 处理歌单同步请求
    • 创建SSE连接
    • 发送进度更新事件
  2. 前端界面(Vue)

    • 创建和管理EventSource连接
    • 监听进度事件并更新UI
    • 处理连接异常情况

后端实现(Java)

NetEaseMusicServiceImpl.java中,我们实现了歌单同步功能并使用回调机制发送进度更新:

@Override
@Transactional(rollbackFor = Exception.class)
@CacheEvict(cacheNames = {"songCache", "playlistCache", "artistCache", "userFavoriteCache"}, allEntries = true)
public Map<String, Object> syncPlaylist(String userId, String playlistId, Consumer<Map<String, Object>> progressCallback) {
    Map<String, Object> result = new HashMap<>();
    try {
        // ... 获取歌单详情 ...
        
        List<Map<String, Object>> songs = (List<Map<String, Object>>) trackResponse.get("songs");
        int successCount = 0;
        int totalSongs = songs.size();
        
        // 发送初始进度
        if (progressCallback != null) {
            Map<String, Object> progressInfo = new HashMap<>();
            progressInfo.put("percentage", 0);
            progressInfo.put("syncedCount", 0);
            progressInfo.put("totalCount", totalSongs);
            progressInfo.put("playlistName", playlistName);
            progressInfo.put("playlistId", newPlaylistId);
            progressCallback.accept(progressInfo);
        }
        
        for (Map<String, Object> songInfo : songs) {
            try {
                syncSong(songInfo, newPlaylistId);
                successCount++;
                
                // 发送进度更新
                if (progressCallback != null) {
                    int percentage = (int) Math.floor((double) successCount / totalSongs * 100);
                    Map<String, Object> progressInfo = new HashMap<>();
                    progressInfo.put("percentage", percentage);
                    progressInfo.put("syncedCount", successCount);
                    progressInfo.put("totalCount", totalSongs);
                    progressInfo.put("playlistName", playlistName);
                    progressInfo.put("playlistId", newPlaylistId);
                    progressInfo.put("currentSong", songInfo.get("name"));
                    progressCallback.accept(progressInfo);
                }
            } catch (Exception e) {
                log.error("同步歌曲失败: {}", e.getMessage());
                // 继续同步其他歌曲
            }
        }
        
        // 发送完成进度
        if (progressCallback != null) {
            Map<String, Object> progressInfo = new HashMap<>();
            progressInfo.put("percentage", 100);
            progressInfo.put("syncedCount", successCount);
            progressInfo.put("totalCount", totalSongs);
            progressInfo.put("playlistName", playlistName);
            progressInfo.put("playlistId", newPlaylistId);
            progressInfo.put("completed", true);
            progressCallback.accept(progressInfo);
        }
        
        // ... 返回结果 ...
    } catch (Exception e) {
        // ... 错误处理 ...
    }
}

后端服务通过接受一个回调函数,在同步过程的关键节点发送进度更新,包括:

  • 同步开始时(进度为0%)
  • 每首歌曲同步完成后
  • 同步全部完成时(进度为100%)

进度信息包含多个字段:同步百分比、已同步歌曲数、总歌曲数、歌单名称、歌单ID、当前同步的歌曲名,以及是否完成标志。

前端实现(Vue)

NeteaseSyncDialog.vue中,我们实现了SSE客户端:

定义状态变量

const sseSource = ref<EventSource | null>(null) // SSE连接
const sseConnected = ref(false) // SSE连接状态
const sseConnectionPromise = ref<Promise<boolean> | null>(null) // SSE连接Promise
const syncingPlaylistIds = ref<string[]>([]) // 正在同步的歌单ID列表
const syncStarted = ref(false) // 同步是否已开始(控制返回按钮显示)

创建SSE连接

const createSseConnection = () => {
  if (!userInfo.value || !userInfo.value.userId) {
    console.error('无法创建SSE连接:用户信息不完整')
    return Promise.resolve(false)
  }
  
  // 如果已经有连接Promise,直接返回
  if (sseConnectionPromise.value && sseConnected.value) {
    console.log('SSE连接已存在,直接使用')
    return sseConnectionPromise.value
  }
  
  // 关闭之前的连接
  closeSseConnection()
  sseConnected.value = false
  syncStarted.value = false
  
  console.log('正在创建SSE连接,用户ID:', userInfo.value.userId)
  
  // 创建新的Promise
  sseConnectionPromise.value = new Promise<boolean>((resolve) => {
    try {
      // 创建连接超时计时器 (8秒)
      const connectionTimeout = setTimeout(() => {
        console.warn('SSE连接超时')
        resolve(false)
      }, 8000)
      
      // 创建新连接
      sseSource.value = syncNeteasePlaylist.subscribeSyncProgress(userInfo.value.userId)
      
      // 连接建立
      const connectHandler = (event: MessageEvent) => {
        try {
          const data = JSON.parse(event.data)
          if (data.status === 'connected') {
            console.log('SSE连接已建立:', data)
            clearTimeout(connectionTimeout)
            sseConnected.value = true
            
            // 显示同步开始提示
            ElMessage.success('同步连接已建立,同步开始')
            syncStarted.value = true
            
            // 移除事件监听器
            sseSource.value?.removeEventListener('connect', connectHandler)
            resolve(true)
          }
        } catch (error) {
          console.error('解析连接事件数据失败:', error)
        }
      }
      
      sseSource.value.addEventListener('connect', connectHandler)
      
      // 进度更新
      sseSource.value.addEventListener('progress', (event) => {
        try {
          const data = JSON.parse(event.data)
          console.log('同步进度更新:', data)
          
          // 使用 Vue 的 nextTick 确保 DOM 更新
          nextTick(() => {
            // 确保接收到的数据有意义再更新
            if (typeof data.percentage === 'number') {
              syncProgress.value = data.percentage
            }
            
            if (typeof data.syncedCount === 'number') {
              syncedSongs.value = data.syncedCount
            }
            
            if (typeof data.totalCount === 'number') {
              totalSongs.value = data.totalCount
            }
            
            if (data.currentSong) {
              currentSyncingSong.value = data.currentSong
            }
            
            // 如果同步完成,自动进入完成步骤
            if (data.completed === true || data.percentage >= 100) {
              console.log('收到同步完成信号,进入完成步骤')
              syncStep.value = 4
              
              // 标记所有正在同步的歌单为已同步
              if (syncingPlaylistIds.value.length > 0) {
                playlists.value.forEach(playlist => {
                  if (syncingPlaylistIds.value.includes(playlist.id)) {
                    playlist.isSynced = true
                  }
                })
              }
              
              // 正常关闭连接
              closeSseConnection()
            }
          })
        } catch (error) {
          console.error('解析进度数据失败:', error, event.data)
        }
      })
      
      // 打开连接
      sseSource.value.addEventListener('open', () => {
        console.log('SSE连接已打开')
      })
      
      // 错误处理
      sseSource.value.addEventListener('error', (event) => {
        // 检查是否是正常关闭(同步完成后的关闭)
        if (syncProgress.value >= 100) {
          console.log('同步已完成,SSE连接正常关闭')
          syncStep.value = 4
          closeSseConnection()
          return
        }
        
        console.error('SSE连接错误:', event)
        
        // 检查连接状态
        if (sseSource.value && sseSource.value.readyState === EventSource.CLOSED) {
          console.error('SSE连接已关闭')
          // 只有在同步过程中(非完成状态)才显示错误消息
          if (syncStep.value === 3 && syncProgress.value < 100) {
            ElMessage.error('同步进度连接已断开,请重试')
            // 回退到歌单选择步骤,保留已选歌单
            syncStep.value = 2
          }
        }
        
        // 清除超时计时器
        clearTimeout(connectionTimeout)
        
        // 标记连接失败
        sseConnected.value = false
        resolve(false)
        
        closeSseConnection()
      })
    } catch (error) {
      console.error('创建SSE连接失败:', error)
      ElMessage.error('无法建立同步进度连接')
      resolve(false)
    }
  })
  
  return sseConnectionPromise.value
}

关闭SSE连接

const closeSseConnection = () => {
  if (sseSource.value) {
    sseSource.value.close()
    sseSource.value = null
  }
  sseConnected.value = false
  sseConnectionPromise.value = null
}

开始同步歌单

const startSync = async () => {
  if (selectedPlaylists.value.length === 0) {
    ElMessage.warning('请至少选择一个歌单进行同步')
    return
  }
  
  // 过滤掉已经同步的歌单
  const playlistsToSync = selectedPlaylists.value.filter(id => {
    const playlist = playlists.value.find(p => p.id === id)
    return playlist && !playlist.isSynced
  })
  
  if (playlistsToSync.length === 0) {
    ElMessage.warning('选中的歌单已全部同步,请选择其他歌单')
    return
  }
  
  // 先设置状态
  syncStep.value = 3
  calculateTotalSongs()
  
  // 记录正在同步的歌单ID
  syncingPlaylistIds.value = [...syncingPlaylistIds.value, ...playlistsToSync]
  
  // 只在初始状态时重置进度
  if (syncProgress.value === 0) {
    syncedSongs.value = 0
    currentSyncingSong.value = ''
  }
  
  // 先创建SSE连接接收实时进度
  loading.value = true
  const connectionSuccessful = await createSseConnection()
  loading.value = false
  
  if (!connectionSuccessful) {
    console.error('SSE连接建立失败,无法开始同步')
    ElMessage.error('无法建立同步进度连接,请重试')
    // 回退到歌单选择步骤,但保留已选歌单
    syncStep.value = 2
    return
  }
  
  console.log('SSE连接已就绪,开始同步歌单')
  
  // 遍历同步选中的歌单
  for (const playlistId of playlistsToSync) {
    try {
      const currentPlaylist = playlists.value.find(p => p.id == playlistId)
      if (!currentPlaylist) {
        console.error('找不到歌单信息:', playlistId)
        continue
      }
      
      console.log(`开始同步歌单: ${currentPlaylist.name} (ID: ${playlistId})`)
      
      // 开始同步当前歌单
      const res = await syncNeteasePlaylist.syncPlaylist({
        userId: userInfo.value.userId,
        playlistId: playlistId
      })
      
      if (res.code !== 0) {
        ElMessage.error(`歌单"${currentPlaylist.name}"同步失败: ${res.message || '未知错误'}`)
      } else {
        console.log(`歌单同步请求已发送: ${currentPlaylist.name}`)
        // 标记歌单为已同步
        currentPlaylist.isSynced = true
      }
    } catch (error) {
      console.error('同步歌单失败', error)
      ElMessage.error('同步歌单失败,请稍后重试')
    }
  }
}

在组件销毁时清理资源

onUnmounted(() => {
  if (qrCheckTimer.value) {
    clearInterval(qrCheckTimer.value)
  }
  closeSseConnection()
})

SSE工作流程

整个SSE通信的工作流程如下:

  1. 建立连接

    • 用户选择歌单并点击"开始同步"
    • 前端创建EventSource连接到后端SSE端点
    • 添加事件监听器,准备接收进度更新
  2. 同步开始

    • 后端开始处理同步请求
    • 发送初始状态(0%)事件
  3. 进度更新

    • 后端同步每首歌曲后,发送进度更新事件
    • 前端接收进度事件,更新UI(进度条、已同步数量等)
  4. 同步完成

    • 后端发送完成事件(进度100%)
    • 前端接收完成事件,显示同步完成页面
    • 前端关闭SSE连接
  5. 错误处理

    • 如果连接断开,前端显示错误消息
    • 提供重试机制,保持已选择的歌单列表

实现要点与优化

  1. 连接管理

    • 创建前检查是否已有连接
    • 设置连接超时处理
    • 组件销毁时关闭连接
  2. 数据验证

    • 验证从服务器接收的数据完整性
    • 只在数据有效时更新UI
  3. 错误恢复

    • 监听连接错误事件
    • 区分正常关闭和异常关闭
    • 在错误时提供用户友好的提示
  4. 安全关闭

    • 同步完成后正确关闭连接
    • 防止资源泄漏
  5. UI交互优化

    • 使用nextTick确保DOM更新
    • 显示详细的同步进度信息
    • 提供视觉反馈(进度条、当前同步歌曲名等)

总结

SSE技术为橘子音乐的网易云音乐同步功能提供了实时进度更新的能力,显著提升了用户体验。相比WebSocket,SSE更加轻量,特别适合这种服务器到客户端的单向数据流场景。

通过前后端配合,我们实现了一个稳定、高效的实时进度更新系统,具有以下特点:

  • 实时显示同步进度
  • 显示当前正在同步的歌曲
  • 错误处理和恢复机制
  • 资源自动清理

这种实现方式不仅提供了良好的用户体验,还保持了系统的可靠性和稳定性,是SSE技术在实际项目中应用的一个很好的示例。