exchange socketio for SSE

This commit is contained in:
Fabricio 2018-11-25 15:10:10 -02:00
parent f7de340a06
commit 1706954423
7 changed files with 85 additions and 83 deletions

View file

@ -9,17 +9,12 @@ import (
var captureID int
var captures CaptureList
type CaptureRepository interface {
Insert(capture Capture)
RemoveAll()
Find(captureID string) *Capture
FindAll() []Capture
}
type CaptureList struct {
items []Capture
mux sync.Mutex
maxItems int
// signals any change in "items"
Updated chan struct{}
}
type Capture struct {
@ -50,9 +45,10 @@ func (c *Capture) Metadata() CaptureMetadata {
}
}
func NewCapturesRepository(maxItems int) CaptureRepository {
func NewCaptureList(maxItems int) *CaptureList {
return &CaptureList{
maxItems: maxItems,
Updated: make(chan struct{}),
}
}
@ -64,6 +60,7 @@ func (c *CaptureList) Insert(capture Capture) {
if len(c.items) > c.maxItems {
c.items = c.items[1:]
}
c.signalsItemsChange()
}
func (c *CaptureList) Find(captureID string) *Capture {
@ -82,13 +79,31 @@ func (c *CaptureList) RemoveAll() {
c.mux.Lock()
defer c.mux.Unlock()
c.items = nil
c.signalsItemsChange()
}
func (c *CaptureList) FindAll() []Capture {
func (c *CaptureList) Items() []Capture {
return c.items
}
func (c *CaptureList) ItemsAsMetadata() []CaptureMetadata {
c.mux.Lock()
defer c.mux.Unlock()
metadatas := make([]CaptureMetadata, len(c.items))
for i, capture := range c.items {
metadatas[i] = capture.Metadata()
}
return metadatas
}
func newID() int {
captureID++
return captureID
}
func (c *CaptureList) signalsItemsChange() {
select {
case c.Updated <- struct{}{}:
default:
}
}

View file

@ -23,8 +23,8 @@ func ReadConfig() Config {
maxCaptures := flag.Int("max-captures", 16, "Set the max number of captures to show in the dashboard")
flag.Parse()
dashboardConnPath := "/socket.io/"
dashboardPath := fmt.Sprintf("/%s/", *dashboard)
dashboardConnPath := fmt.Sprintf("/%s/conn/", *dashboard)
dashboardClearPath := fmt.Sprintf("/%s/clear/", *dashboard)
dashboardItemInfoPath := fmt.Sprintf("/%s/items/", *dashboard)

View file

@ -7,7 +7,6 @@ const dashboardHTML = `
<meta charset="utf-8">
<link rel="icon" href="data:;base64,iVBORw0KGgo=">
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/1.7.2/angular.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.1/socket.io.slim.js"></script>
<link href="https://fonts.googleapis.com/css?family=Inconsolata:400,700" rel="stylesheet">
<title>Dashboard</title>
<style>
@ -210,7 +209,7 @@ const dashboardHTML = `
</div>
<div class="welcome" ng-show="items.length == 0">
Waiting for requests on http://localhost:{{config.proxyPort}}/
Waiting for requests on http://localhost:<<.ProxyPort>>/
</div>
</div>
@ -222,7 +221,7 @@ const dashboardHTML = `
$scope.show = item => {
$scope.path = item.path;
$scope.selectedId = item.id;
let path = $scope.config.dashboardItemInfoPath + item.id;
let path = <<.DashboardItemInfoPath>> + item.id;
$http.get(path).then(r => {
$scope.request = r.data.request;
$scope.response = r.data.response;
@ -240,7 +239,7 @@ const dashboardHTML = `
}
$scope.clearDashboard = () => {
$http.get($scope.config.dashboardClearPath).then(clearRequestAndResponse);
$http.get(<<.DashboardClearPath>>).then(clearRequestAndResponse);
}
function clearRequestAndResponse() {
@ -276,18 +275,14 @@ const dashboardHTML = `
$scope[key] = data.replace(body, prettyBody);
}
let socket = io();
socket.on('connect', () => {
const evt = new EventSource(<<.DashboardConnPath>>);
evt.addEventListener('connected', e => {
clearRequestAndResponse();
socket.off('config');
socket.off('captures');
socket.on('config', args => {
$scope.config = args;
});
socket.on('captures', captures => {
$scope.items = captures;
$scope.$apply();
});
$scope.$apply();
});
evt.addEventListener('captures', e => {
$scope.items = JSON.parse(e.data);
$scope.$apply();
});
});
</script>

7
go.mod
View file

@ -1,8 +1,3 @@
module github.com/ofabricio/capture
require (
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb // indirect
github.com/googollee/go-socket.io v0.0.0-20181101151912-c8aeb1ed9b49
github.com/gorilla/websocket v1.4.0 // indirect
github.com/ofabricio/curl v0.1.0
)
require github.com/ofabricio/curl v0.1.0

6
go.sum
View file

@ -1,8 +1,2 @@
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb h1:n22Aukg/TjoypWc37dbKIpCsz0VMFPD36HQk1WKvg3A=
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb/go.mod h1:MBpz1MS3P4HtRcBpQU4HcjvWXZ9q+JWacMEh2/BFYbg=
github.com/googollee/go-socket.io v0.0.0-20181101151912-c8aeb1ed9b49 h1:vKXGRzlhWE9TUVhLqAOcgQbfYvReAnsvQQIcnvWMfcg=
github.com/googollee/go-socket.io v0.0.0-20181101151912-c8aeb1ed9b49/go.mod h1:ftBGBMhSYToR5oV4ImIPKvAIsNaTkLC+tTvoNafqxlQ=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/ofabricio/curl v0.1.0 h1:ntXuBULZLQmCdAMxZNXzse069DbAKb/Flxe/2uuZuNk=
github.com/ofabricio/curl v0.1.0/go.mod h1:RtLkZIOgxjm+l0jdj04lrETzu8u5SmPPdLyGAuC4ukg=

89
main.go
View file

@ -5,6 +5,7 @@ import (
"compress/gzip"
"encoding/json"
"fmt"
"html/template"
"io"
"io/ioutil"
"net/http"
@ -14,12 +15,9 @@ import (
"plugin"
"strings"
"github.com/googollee/go-socket.io"
"github.com/ofabricio/curl"
)
var dashboardSocket socketio.Socket
func main() {
config := ReadConfig()
startCapture(config)
@ -27,13 +25,13 @@ func main() {
func startCapture(config Config) {
repo := NewCapturesRepository(config.MaxCaptures)
list := NewCaptureList(config.MaxCaptures)
http.Handle("/", NewPlugin(NewRecorder(repo, NewProxyHandler(config.TargetURL))))
http.Handle(config.DashboardPath, NewDashboardHtmlHandler())
http.Handle(config.DashboardClearPath, NewDashboardClearHandler(repo))
http.Handle(config.DashboardItemInfoPath, NewDashboardItemInfoHandler(repo))
http.Handle(config.DashboardConnPath, NewDashboardSocketHandler(repo, config))
http.Handle("/", NewPlugin(NewRecorder(list, NewProxyHandler(config.TargetURL))))
http.Handle(config.DashboardPath, NewDashboardHtmlHandler(config))
http.Handle(config.DashboardConnPath, NewDashboardConnHandler(list))
http.Handle(config.DashboardClearPath, NewDashboardClearHandler(list))
http.Handle(config.DashboardItemInfoPath, NewDashboardItemInfoHandler(list))
captureHost := fmt.Sprintf("http://localhost:%s", config.ProxyPort)
@ -43,41 +41,58 @@ func startCapture(config Config) {
fmt.Println(http.ListenAndServe(":"+config.ProxyPort, nil))
}
func NewDashboardSocketHandler(repo CaptureRepository, config Config) http.Handler {
server, err := socketio.NewServer(nil)
if err != nil {
fmt.Printf("socket server error: %v\n", err)
}
server.On("connection", func(so socketio.Socket) {
dashboardSocket = so
dashboardSocket.Emit("config", config)
emitToDashboard(repo.FindAll())
func NewDashboardConnHandler(list *CaptureList) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if _, ok := rw.(http.Flusher); !ok {
fmt.Printf("streaming not supported at %s\n", req.URL)
http.Error(rw, "streaming not supported", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
fmt.Fprintf(rw, "event: connected\ndata: %s\n\n", "clear")
rw.(http.Flusher).Flush()
for {
jsn, _ := json.Marshal(list.ItemsAsMetadata())
fmt.Fprintf(rw, "event: captures\ndata: %s\n\n", jsn)
rw.(http.Flusher).Flush()
select {
case <-list.Updated:
case <-req.Context().Done():
return
}
}
})
server.On("error", func(so socketio.Socket, err error) {
fmt.Printf("socket error: %v\n", err)
})
return server
}
func NewDashboardClearHandler(repo CaptureRepository) http.Handler {
func NewDashboardClearHandler(list *CaptureList) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
repo.RemoveAll()
emitToDashboard(nil)
list.RemoveAll()
rw.WriteHeader(http.StatusOK)
})
}
func NewDashboardHtmlHandler() http.Handler {
func NewDashboardHtmlHandler(config Config) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Add("Content-Type", "text/html")
fmt.Fprint(rw, dashboardHTML)
t, err := template.New("dashboard template").Delims("<<", ">>").Parse(dashboardHTML)
if err != nil {
msg := fmt.Sprintf("could not parse dashboard html template: %v", err)
fmt.Println(msg)
http.Error(rw, msg, http.StatusInternalServerError)
return
}
t.Execute(rw, config)
})
}
func NewDashboardItemInfoHandler(repo CaptureRepository) http.Handler {
func NewDashboardItemInfoHandler(list *CaptureList) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
id := req.URL.Path[strings.LastIndex(req.URL.Path, "/")+1:]
capture := repo.Find(id)
capture := list.Find(id)
if capture == nil {
http.Error(rw, "Item Not Found", http.StatusNotFound)
return
@ -108,7 +123,7 @@ func NewPlugin(next http.Handler) http.Handler {
return pluginFn(next)
}
func NewRecorder(repo CaptureRepository, next http.Handler) http.Handler {
func NewRecorder(list *CaptureList, next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// save req body for later
@ -129,8 +144,7 @@ func NewRecorder(repo CaptureRepository, next http.Handler) http.Handler {
// record req and res
req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
res := rec.Result()
repo.Insert(Capture{Req: req, Res: res})
emitToDashboard(repo.FindAll())
list.Insert(Capture{Req: req, Res: res})
})
}
@ -195,14 +209,3 @@ func drain(b io.ReadCloser) (io.ReadCloser, []byte) {
b.Close()
return ioutil.NopCloser(bytes.NewReader(all)), all
}
func emitToDashboard(captures []Capture) {
if dashboardSocket == nil {
return
}
metadatas := make([]CaptureMetadata, len(captures))
for i, capture := range captures {
metadatas[i] = capture.Metadata()
}
dashboardSocket.Emit("captures", metadatas)
}

View file

@ -181,8 +181,8 @@ func TestCaptureIDConcurrence(t *testing.T) {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
rw.WriteHeader(http.StatusOK)
}))
repo := NewCapturesRepository(interactions)
capture := httptest.NewServer(NewRecorder(repo, NewProxyHandler(service.URL)))
list := NewCaptureList(interactions)
capture := httptest.NewServer(NewRecorder(list, NewProxyHandler(service.URL)))
defer service.Close()
defer capture.Close()
@ -205,7 +205,7 @@ func TestCaptureIDConcurrence(t *testing.T) {
// then
// Tests if captures IDs are sequential
captures := repo.FindAll()
captures := list.Items()
if len(captures) == 0 {
t.Fatalf("No captures found")
}