悠悠楠杉
使用Golang实现SSE服务:构建高效的服务器推送事件方案
引言
在现代Web应用中,实时数据推送已成为提升用户体验的关键技术。Server-Sent Events (SSE) 作为一种轻量级的服务器推送技术,相比WebSocket更简单易用,特别适合单向数据推送场景。本文将详细介绍如何使用Golang构建高性能的SSE服务。
SSE技术概述
SSE(Server-Sent Events)是HTML5标准的一部分,允许服务器通过HTTP连接向客户端推送事件。它基于简单的文本协议,使用长轮询机制,具有以下特点:
- 单向通信(服务器到客户端)
- 基于HTTP协议,无需特殊协议
- 自动重连机制
- 轻量级,实现简单
Golang实现SSE服务的核心组件
1. 基本HTTP服务器设置
首先,我们需要创建一个基本的HTTP服务器:
go
package main
import (
"log"
"net/http"
)
func main() {
http.HandleFunc("/events", sseHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
2. SSE处理器实现
SSE处理器的核心是设置正确的响应头并保持连接打开:
go
func sseHandler(w http.ResponseWriter, r http.Request) {
// 设置SSE必需的响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "")
// 创建通道用于通知连接关闭
closeNotifier, ok := w.(http.CloseNotifier)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
closeChan := closeNotifier.CloseNotify()
// 保持连接打开并定期发送数据
for {
select {
case <-closeChan:
log.Println("Connection closed by client")
return
default:
// 构造SSE格式消息
event := "message"
data := "当前时间: " + time.Now().Format(time.RFC3339)
// 写入事件数据
fmt.Fprintf(w, "event: %s\n", event)
fmt.Fprintf(w, "data: %s\n\n", data)
// 刷新缓冲区
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
flusher.Flush()
// 间隔时间
time.Sleep(2 * time.Second)
}
}
}
高级SSE实现技巧
1. 事件类型区分
SSE支持多种事件类型,可以用于区分不同种类的消息:
go
func sendEvent(w io.Writer, eventType, data string) {
fmt.Fprintf(w, "event: %s\n", eventType)
fmt.Fprintf(w, "data: %s\n\n", data)
}
// 使用示例
sendEvent(w, "statusUpdate", {"status":"processing","progress":45}
)
sendEvent(w, "logMessage", "New user connected")
2. 重连时间控制
可以指定客户端断开后重连的时间:
go
fmt.Fprintf(w, "retry: 5000\n") // 5秒后重试
3. 消息ID跟踪
通过消息ID帮助客户端识别是否丢失了消息:
go
messageID := 1
for {
fmt.Fprintf(w, "id: %d\n", messageID)
messageID++
// ...发送消息内容...
}
实际应用场景实现
1. 实时日志监控系统
go
func logStreamHandler(w http.ResponseWriter, r *http.Request) {
// 设置SSE头
w.Header().Set("Content-Type", "text/event-stream")
flusher, _ := w.(http.Flusher)
// 模拟日志生成
logSources := []string{"API", "Database", "Auth", "Cache"}
for {
source := logSources[rand.Intn(len(logSources))]
level := []string{"INFO", "WARN", "ERROR"}[rand.Intn(3)]
message := fmt.Sprintf("[%s] %s: Sample log message %d", level, source, rand.Intn(1000))
logEntry := struct {
Timestamp string `json:"timestamp"`
Source string `json:"source"`
Level string `json:"level"`
Message string `json:"message"`
}{
Timestamp: time.Now().Format(time.RFC3339),
Source: source,
Level: level,
Message: message,
}
jsonData, _ := json.Marshal(logEntry)
fmt.Fprintf(w, "data: %s\n\n", jsonData)
flusher.Flush()
time.Sleep(time.Duration(rand.Intn(3)+1) * time.Second)
}
}
2. 实时股票价格推送
go
type Stock struct {
Symbol string json:"symbol"
Price float64 json:"price"
Change float64 json:"change"
Timestamp string json:"timestamp"
}
func stockHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher, _ := w.(http.Flusher)
stocks := []string{"AAPL", "GOOGL", "MSFT", "AMZN", "FB"}
priceMap := make(map[string]float64)
for _, symbol := range stocks {
priceMap[symbol] = 100 + rand.Float64()*1000
}
for {
for symbol, price := range priceMap {
change := (rand.Float64() - 0.5) * 10
newPrice := price + change
if newPrice < 10 {
newPrice = 10
}
priceMap[symbol] = newPrice
stock := Stock{
Symbol: symbol,
Price: newPrice,
Change: change,
Timestamp: time.Now().Format(time.RFC3339),
}
jsonData, _ := json.Marshal(stock)
fmt.Fprintf(w, "event: stockUpdate\n")
fmt.Fprintf(w, "data: %s\n\n", jsonData)
flusher.Flush()
}
time.Sleep(2 * time.Second)
}
}
性能优化与注意事项
1. 连接管理
- 使用连接池管理大量并发SSE连接
- 实现心跳机制检测死连接
- 考虑使用context处理连接超时
go
func sseHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
// ...设置SSE头...
ticker := time.NewTicker(15 * time.Second) // 心跳间隔
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 发送心跳保持连接
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case <-time.After(1 * time.Second):
// 正常业务数据发送
// ...
}
}
}
2. 错误处理
- 处理客户端断开连接的情况
- 记录连接错误和异常
- 实现优雅的重连机制
3. 扩展性与集群支持
- 使用Redis Pub/Sub实现多实例消息广播
- 考虑使用Nginx代理SSE连接
- 实现负载均衡下的连接粘性
客户端实现示例
完整方案需要配套的客户端代码,以下是JavaScript示例:
javascript
const eventSource = new EventSource('/events');
eventSource.addEventListener('open', (e) => {
console.log('Connection opened');
});
eventSource.addEventListener('message', (e) => {
console.log('Message:', e.data);
});
eventSource.addEventListener('stockUpdate', (e) => {
const stockData = JSON.parse(e.data);
updateStockTable(stockData);
});
eventSource.addEventListener('error', (e) => {
console.error('Error:', e);
// 可根据状态码决定是否重连
});