Skip to content

Commit

Permalink
feat: Show posts from firehose on dashboard
Browse files Browse the repository at this point in the history
Add a list view of up to 500 firehose posts on the dashboard. This new
feature introduces some changes in the plumbing in the serve command.
Prevously there were only one channel that would allow the firehose to
send create posts events to the db writer.

Now both the db writer and server is interested in these events. Further
more one channel is not enough for the server use case. In golang
channels are single receiver. Even in a fan out pattern with multiple
listeners, a message will be read by at most one receiver!

To enable 1 to N broadcasting you need N channels. This commit
introduces a new Broadcaster receiver struct that extends the sync.Mutex
struct to handle locking so that go routines don't get into conflict
when using the broadcaster.

It contains a map of uuids keys (to identify a single SSE channel) and
channel values. When a post is posted by the firehose to its channel
glue code picks it up and passes it on to every channel in the
broadcaster struct.

To pass the posts to the client side dashboard Server Sent Events are
used. The client creates an EventSource to initate the SSE connection.
Upon connection the go server creates a channel and adds it to the
broadcaster map.

The first message passed by the server is the key identifying the SSE
channel. This way the client can send a DELETE request before
disconnecting to allow the server to clean up immediately. Otherwise the
connection is held open until the underlying TCP connection disconnects
and the server can no longer write to the connection. This can take a
bit of time.

The client shows the posts in a div which takes up at most 60% of the
viewport height. it then uses scroll inside the element to allow
scrolling posts. It only shows the data available in the firehose post
format which means usernames and profile pictures are not available.
  • Loading branch information
snorremd committed Oct 21, 2023
1 parent ea3cd21 commit 3fa933a
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 78 deletions.
37 changes: 30 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"norsky/db"
"norsky/firehose"
"norsky/models"
"norsky/server"
"os"
"os/signal"
Expand Down Expand Up @@ -80,26 +81,48 @@ func serveCmd() *cli.Command {

// Channel for subscribing to bluesky posts
postChan := make(chan interface{})
dbPostChan := make(chan interface{}) // Channel for writing posts to the database
broadcaster := server.NewBroadcaster()

// Setup the server and firehose
app := server.Server(&server.ServerConfig{
Hostname: hostname,
Reader: db.NewReader(database),
Hostname: hostname,
Reader: db.NewReader(database),
Broadcaster: broadcaster,
})
fh := firehose.New(postChan, ctx.Context)
dbwriter := db.NewWriter(database, postChan)

dbwriter := db.NewWriter(database, dbPostChan)

// Graceful shutdown via wait group
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
var wg sync.WaitGroup

// Graceful shutdown logic
go func() {
<-c
fmt.Println("Gracefully shutting down...")
app.ShutdownWithTimeout(60 * time.Second)
defer wg.Add(-3) // Decrement the waitgroup counter by 2 after shutdown of server and firehose
app.ShutdownWithTimeout(5 * time.Second) // Wait 5 seconds for all connections to close
fh.Shutdown()
broadcaster.Shutdown()
defer wg.Add(-4) // Decrement the waitgroup counter by 2 after shutdown of server and firehose

}()

// Some glue code to pass posts from the firehose to the database and/or broadcaster
// Ideally one might want to do this in a more elegant way
// TODO: Move broadcaster into server package, i.e. make server a receiver and handle broadcaster and fiber together
go func() {
for post := range postChan {
switch post := post.(type) {
case models.CreatePostEvent:
dbPostChan <- post
broadcaster.Broadcast(post) // Broadcast new post to SSE clients
default:
dbPostChan <- post
}
}
}()

go func() {
Expand All @@ -109,6 +132,7 @@ func serveCmd() *cli.Command {

go func() {
fmt.Println("Starting server...")

if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil {
log.Error(err)
c <- os.Interrupt
Expand All @@ -121,13 +145,12 @@ func serveCmd() *cli.Command {
}()

// Wait for both the server and firehose to shutdown
wg.Add(3)
wg.Add(4)
wg.Wait()

log.Info("Norsky feed generator stopped")

return nil

},
}
}
20 changes: 5 additions & 15 deletions db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"database/sql"
"norsky/models"
"strconv"
"strings"
"time"

sqlbuilder "github.com/huandu/go-sqlbuilder"
log "github.com/sirupsen/logrus"
)

type Reader struct {
Expand All @@ -26,12 +24,12 @@ func NewReader(database string) *Reader {
}
}

func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Post, error) {
func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.FeedPost, error) {

// Return next limit posts after cursor, ordered by created_at and uri

sb := sqlbuilder.NewSelectBuilder()
sb.Select("id", "uri", "created_at", "group_concat(language)").From("posts")
sb.Select("id", "uri").From("posts")
if postId != 0 {
sb.Where(
sb.LessThan("id", postId),
Expand All @@ -53,18 +51,15 @@ func (reader *Reader) GetFeed(lang string, limit int, postId int64) ([]models.Po
defer rows.Close()

// Scan rows, collapse languages into single post
var posts []models.Post
var posts []models.FeedPost

for rows.Next() {
var post models.Post
var langs string
var post models.FeedPost
// Scan row and
if err := rows.Scan(&post.Id, &post.Uri, &post.CreatedAt, &langs); err != nil {
if err := rows.Scan(&post.Id, &post.Uri); err != nil {
return nil, err
}

// Split languages into a slice and add to the post model
post.Languages = strings.Split(langs, ",")
posts = append(posts, post)
}

Expand Down Expand Up @@ -125,11 +120,6 @@ func (reader *Reader) GetPostCountPerTime(lang string, timeAgg string) ([]models

sql, args := sb.BuildWithFlavor(sqlbuilder.Flavor(sqlbuilder.SQLite))

log.WithFields(log.Fields{
"sql": sql,
"args": args,
}).Info("Get posts per hour")

rows, err := reader.db.Query(sql, args...)
if err != nil {
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions db/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ func (writer *Writer) Subscribe() {
}

case post := <-writer.postChan:
log.WithField("post", post).Info("Received post")
switch post := post.(type) {
case models.CreatePostEvent:
case *models.CreatePostEvent:
createPost(writer.db, post.Post)
case models.DeletePostEvent:
case *models.DeletePostEvent:
deletePost(writer.db, post.Post)
default:
log.Info("Unknown post type")
Expand Down
2 changes: 1 addition & 1 deletion feeds/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func genericAlgo(reader *db.Reader, cursor string, limit int, lang string) (*mod
}

if posts == nil {
posts = []models.Post{}
posts = []models.FeedPost{}
}

var nextCursor *string
Expand Down
1 change: 1 addition & 0 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (firehose *Firehose) Subscribe() {
}

func (firehose *Firehose) Shutdown() {
// TODO: Graceful shutdown here as "Error handling repo stream: read tcp use of closed network connection "
firehose.scheduler.Shutdown()
firehose.conn.Close()
fmt.Println("Firehose shutdown")
Expand Down
104 changes: 99 additions & 5 deletions frontend/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import {
createResource,
Resource,
Accessor,
createComputed,
createMemo,
createEffect,
onCleanup,
For,
} from "solid-js";
import { Chart, Title, Tooltip, Legend, Colors, TimeScale, ChartDataset, ChartType, Point, TimeUnit, TimeSeriesScale } from "chart.js";
import { Line } from "solid-chartjs";
import { type ChartData, type ChartOptions } from "chart.js";
import { formatRelative } from 'date-fns'
import colors from "tailwindcss/colors";
import "chartjs-adapter-date-fns";

Expand Down Expand Up @@ -157,7 +158,7 @@ const PostPerHourChart: Component<{ data: Resource<Data[]>, time: Accessor<strin
data={cdata()}
options={coptions()}
width={500}
height={200}
height={300}
/>
</div>
);
Expand All @@ -178,18 +179,110 @@ const PostPerTime: Component<{

const [data] = createResource(() => [time(), lang] as const, fetcher);
return (
<div>
<div class="h-full">
<h1 class="text-2xl text-sky-300 text-center pb-8">{label}</h1>
<PostPerHourChart time={time} data={data} />
</div>
);
};

interface Post {
createdAt: number,
languages: string[]
text: string
uri: string
}

const langToName = (lang: string): string => {
switch (lang) {
case "nb":
return "Norwegian bokmål"
case "nn":
return "Norwegian nynorsk"
case "smi":
return "Sami"
default:
return lang
}
}

const PostFirehose: Component = () => {
const [key, setKey] = createSignal<string>(); // Used to politely close the event source
const [posts, setPosts] = createSignal<Post[]>([]);
const [eventSource, setEventSource] = createSignal<EventSource | null>(null);

onMount(() => {
console.log("Mounting event source")
const es = new EventSource(`${host}/dashboard/feed/sse`);
setEventSource(es);

es.onmessage = (e) => {
if(key() === undefined) {
setKey(e.data);
return;
}
console.log("Message received", e);
const post = JSON.parse(e.data) as Post;
setPosts((posts) => [post, ...posts.slice(0, 499)]); // Limit to 500 posts
};
});

const close = async () => {
console.log("Closing event source");
eventSource()?.close();
await fetch(`${host}/dashboard/feed/sse?key=${key()}`, { method: "DELETE" })
}

if (import.meta.hot) {
import.meta.hot.accept(close);
}

window.addEventListener("beforeunload", close)


// Display a pretty list of the posts
// Set a max height and use overflow-y: scroll to make it scrollable
// Height should be whatever the parent is.

return <div class="flex flex-col gap-4 h-[800px] max-h-[65vh] col-span-full md:col-span-2">
<h1 class="text-2xl text-sky-300 text-center pb-8">Recent posts</h1>
<div class="overflow-y-scroll scroll h-full gap-4 flex flex-col no-scrollbar bg-zinc-800 rounded-lg p-4">
<For each={posts()}>
{(post) => {
const createdAt = formatRelative(new Date(post.createdAt * 1000), new Date())
// Match regex to get the profile and post id
// URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623
// profile = did:plc:opkjeuzx2lego6a7gueytryu
// post = 3kcbxsslpu623

const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/
const [profile, postId] = regex.exec(post.uri)!.slice(1)
const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}`
return <div class="flex flex-col gap-4 p-4 bg-zinc-900 rounded-md">
<div class="flex flex-row justify-between">
<p class="text-zinc-400">{createdAt}</p>
<p class="text-zinc-400">{post.languages.map(langToName).join(", ")}</p>
</div>
<p class="text-zinc-300 w-full max-w-[80ch]">{post.text}</p>

{/* Link to post on Bluesky */}
<div class="flex flex-row justify-end">
<a class="text-sky-300 hover:text-sky-200 underline" href={bskyLink} target="_blank">View on Bsky</a>
</div>
</div>
}}
</For>
</div>
</div>;
}



const App: Component = () => {
const [time, setTime] = createSignal<string>("hour");

return (
<div class="flex flex-col p-6 md:p-8 lg:p-16 gap-16">
<div class="flex flex-col p-6 md:p-8 lg:p-16">
{/* Add a header here showing the Norsky logo and the name */}
<div class="flex justify-start items-center gap-4">
<img src={icon} alt="Norsky logo" class="w-16 h-16" />
Expand All @@ -213,6 +306,7 @@ const App: Component = () => {
<PostPerTime time={time} lang="nb" label="Norwegian bokmål" />
<PostPerTime time={time} lang="nn" label="Norwegian nynorsk" />
<PostPerTime time={time} lang="smi" label="Sami" />
<PostFirehose />
</div>
</div>
</div>
Expand Down
15 changes: 15 additions & 0 deletions frontend/index.css
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
@tailwind base;
@tailwind components;
@tailwind utilities;

@layer utilities {
@variants responsive {
/* Hide scrollbar for Chrome, Safari and Opera */
.no-scrollbar::-webkit-scrollbar {
display: none;
}

/* Hide scrollbar for IE, Edge and Firefox */
.no-scrollbar {
-ms-overflow-style: none; /* IE and Edge */
scrollbar-width: none; /* Firefox */
}
}
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/bluesky-social/indigo v0.0.0-20230919180850-251fff6498dc
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cqroot/prompt v0.9.1
github.com/gofiber/fiber/v2 v2.49.2
github.com/gofiber/fiber/v2 v2.50.0
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/gorilla/websocket v1.5.0
github.com/huandu/go-sqlbuilder v1.22.0
Expand All @@ -15,7 +15,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/strideynet/bsky-furry-feed v0.0.37
github.com/urfave/cli/v2 v2.25.1
github.com/valyala/fasthttp v1.49.0
github.com/valyala/fasthttp v1.50.0
golang.org/x/crypto/x509roots/fallback v0.0.0-20230928175846-ec07f4e35b9e
)

Expand Down Expand Up @@ -116,7 +116,7 @@ require (
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
Loading

0 comments on commit 3fa933a

Please sign in to comment.