mirror of
https://github.com/kataras/iris.git
synced 2025-12-18 10:27:06 +00:00
reorganization of _examples and add some new examples such as iris+groupcache+mysql+docker
Former-commit-id: ed635ee95de7160cde11eaabc0c1dcb0e460a620
This commit is contained in:
394
_examples/kafka-api/main.go
Normal file
394
_examples/kafka-api/main.go
Normal file
@@ -0,0 +1,394 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/kataras/iris/v12"
|
||||
)
|
||||
|
||||
/*
|
||||
First of all, read about Apache Kafka, install and run it, if you didn't already: https://kafka.apache.org/quickstart
|
||||
|
||||
Secondly, install your favourite Go library for Apache Kafka communication.
|
||||
I have chosen the shopify's one although I really loved the `segmentio/kafka-go` as well but it needs more to be done there
|
||||
and you will be bored to read all the necessary code required to get started with it, so:
|
||||
$ go get -u github.com/Shopify/sarama
|
||||
|
||||
The minimum Apache Kafka broker(s) version required is 0.10.0.0 but 0.11.x+ is recommended (tested with 2.5.0).
|
||||
|
||||
Resources:
|
||||
- https://github.com/apache/kafka
|
||||
- https://github.com/Shopify/sarama/blob/master/examples/http_server/http_server.go
|
||||
- DIY
|
||||
*/
|
||||
|
||||
// package-level variables for the sake of the example
|
||||
// but you can define them inside your main func
|
||||
// and pass around this config whenever you need to create a client or a producer or a consumer or use a cluster.
|
||||
var (
|
||||
// The Kafka brokers to connect to, as a comma separated list.
|
||||
brokers = []string{getenv("KAFKA_1", "localhost:9092")}
|
||||
// The config which makes our live easier when passing around, it pre-mades a lot of things for us.
|
||||
config *sarama.Config
|
||||
)
|
||||
|
||||
func getenv(key string, def string) string {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
return value
|
||||
}
|
||||
|
||||
return def
|
||||
}
|
||||
|
||||
func init() {
|
||||
config = sarama.NewConfig()
|
||||
config.ClientID = "iris-example-client"
|
||||
config.Version = sarama.V0_11_0_2
|
||||
// config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message.
|
||||
config.Producer.Compression = sarama.CompressionSnappy
|
||||
config.Producer.Flush.Frequency = 500 * time.Millisecond
|
||||
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message.
|
||||
config.Producer.Return.Successes = true
|
||||
|
||||
// for SASL/basic plain text authentication: config.Net.SASL.
|
||||
// config.Net.SASL.Enable = true
|
||||
// config.Net.SASL.Handshake = false
|
||||
// config.Net.SASL.User = "myuser"
|
||||
// config.Net.SASL.Password = "mypass"
|
||||
|
||||
config.Consumer.Return.Errors = true
|
||||
}
|
||||
|
||||
func main() {
|
||||
app := iris.New()
|
||||
app.OnErrorCode(iris.StatusNotFound, handleNotFound)
|
||||
|
||||
v1 := app.Party("/api/v1")
|
||||
{
|
||||
topicsAPI := v1.Party("/topics")
|
||||
{
|
||||
topicsAPI.Post("/", postTopicsHandler) // create a topic.
|
||||
topicsAPI.Get("/", getTopicsHandler) // list all topics.
|
||||
|
||||
topicsAPI.Post("/{topic}/produce", postTopicProduceHandler) // store to a topic.
|
||||
topicsAPI.Get("/{topic}/consume", getTopicConsumeSSEHandler) // retrieve all messages from a topic.
|
||||
}
|
||||
}
|
||||
|
||||
app.Get("/", docsHandler)
|
||||
|
||||
app.Logger().Infof("Brokers: %s", strings.Join(brokers, ", "))
|
||||
// GET : http://localhost:8080
|
||||
// POST, GET: http://localhost:8080/api/v1/topics
|
||||
// POST : http://localhost:8080/apiv1/topics/{topic}/produce?key=my-key
|
||||
// GET : http://localhost:8080/apiv1/topics/{topic}/consume?partition=0&offset=0
|
||||
app.Listen(":8080")
|
||||
}
|
||||
|
||||
// simple use-case, you can use templates and views obviously, see the "_examples/views" examples.
|
||||
func docsHandler(ctx iris.Context) {
|
||||
ctx.ContentType("text/html") // or ctx.HTML(fmt.Sprintf(...))
|
||||
ctx.Writef(`<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<style>
|
||||
th, td {
|
||||
border: 1px solid black;
|
||||
padding: 15px;
|
||||
text-align: left;
|
||||
}
|
||||
</style>
|
||||
</head>`)
|
||||
defer ctx.Writef("</html>")
|
||||
|
||||
ctx.Writef("<body>")
|
||||
defer ctx.Writef("</body>")
|
||||
|
||||
ctx.Writef(`
|
||||
<table>
|
||||
<tr>
|
||||
<th>Method</th>
|
||||
<th>Path</th>
|
||||
<th>Handler</th>
|
||||
</tr>
|
||||
`)
|
||||
defer ctx.Writef(`</table>`)
|
||||
|
||||
registeredRoutes := ctx.Application().GetRoutesReadOnly()
|
||||
for _, r := range registeredRoutes {
|
||||
if r.Path() == "/" { // don't list the root, current one.
|
||||
continue
|
||||
}
|
||||
|
||||
ctx.Writef(`
|
||||
<tr>
|
||||
<td>%s</td>
|
||||
<td>%s%s</td>
|
||||
<td>%s</td>
|
||||
</tr>
|
||||
`, r.Method(), ctx.Host(), r.Path(), r.MainHandlerName())
|
||||
}
|
||||
}
|
||||
|
||||
type httpError struct {
|
||||
Code int `json:"code"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
func (h httpError) Error() string {
|
||||
return fmt.Sprintf("Status Code: %d\nReason: %s", h.Code, h.Reason)
|
||||
}
|
||||
|
||||
func fail(ctx iris.Context, statusCode int, format string, a ...interface{}) {
|
||||
reason := "unspecified"
|
||||
if format != "" {
|
||||
reason = fmt.Sprintf(format, a...)
|
||||
}
|
||||
|
||||
err := httpError{
|
||||
Code: statusCode,
|
||||
Reason: reason,
|
||||
}
|
||||
|
||||
ctx.StopWithJSON(statusCode, err)
|
||||
}
|
||||
|
||||
func handleNotFound(ctx iris.Context) {
|
||||
suggestPaths := ctx.FindClosest(3)
|
||||
if len(suggestPaths) == 0 {
|
||||
ctx.WriteString("not found")
|
||||
return
|
||||
}
|
||||
|
||||
ctx.HTML("Did you mean?<ul>")
|
||||
for _, s := range suggestPaths {
|
||||
ctx.HTML(`<li><a href="%s">%s</a></li>`, s, s)
|
||||
}
|
||||
ctx.HTML("</ul>")
|
||||
}
|
||||
|
||||
// Topic the payload for a kafka topic creation.
|
||||
type Topic struct {
|
||||
Topic string `json:"topic"`
|
||||
Partitions int32 `json:"partitions"`
|
||||
ReplicationFactor int16 `json:"replication"`
|
||||
Configs []kv `json:"configs,omitempty"`
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
func createKafkaTopic(t Topic) error {
|
||||
cluster, err := sarama.NewClusterAdmin(brokers, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cluster.Close()
|
||||
|
||||
topicName := t.Topic
|
||||
topicDetail := sarama.TopicDetail{
|
||||
NumPartitions: t.Partitions,
|
||||
ReplicationFactor: t.ReplicationFactor,
|
||||
}
|
||||
|
||||
if len(t.Configs) > 0 {
|
||||
topicDetail.ConfigEntries = make(map[string]*string, len(t.Configs))
|
||||
for _, c := range t.Configs {
|
||||
topicDetail.ConfigEntries[c.Key] = &c.Value // generate a ptr, or fill a new(string) with it and use that.
|
||||
}
|
||||
}
|
||||
|
||||
return cluster.CreateTopic(topicName, &topicDetail, false)
|
||||
}
|
||||
|
||||
func postTopicsHandler(ctx iris.Context) {
|
||||
var t Topic
|
||||
err := ctx.ReadJSON(&t)
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusBadRequest,
|
||||
"received invalid topic payload: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// try to create the topic inside kafka.
|
||||
err = createKafkaTopic(t)
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusInternalServerError,
|
||||
"unable to create topic: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// unnecessary statement but it's here to show you that topic is created,
|
||||
// depending on your API expectations and how you used to work
|
||||
// you may want to change the status code to something like `iris.StatusCreated`.
|
||||
ctx.StatusCode(iris.StatusOK)
|
||||
}
|
||||
|
||||
func getKafkaTopics() ([]string, error) {
|
||||
client, err := sarama.NewClient(brokers, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
return client.Topics()
|
||||
}
|
||||
|
||||
func getTopicsHandler(ctx iris.Context) {
|
||||
topics, err := getKafkaTopics()
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusInternalServerError,
|
||||
"unable to retrieve topics: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(topics)
|
||||
}
|
||||
|
||||
func produceKafkaMessage(toTopic string, key string, value []byte) (partition int32, offset int64, err error) {
|
||||
// On the broker side, you may want to change the following settings to get
|
||||
// stronger consistency guarantees:
|
||||
// - For your broker, set `unclean.leader.election.enable` to false
|
||||
// - For the topic, you could increase `min.insync.replicas`.
|
||||
|
||||
producer, err := sarama.NewSyncProducer(brokers, config)
|
||||
if err != nil {
|
||||
return -1, -1, err
|
||||
}
|
||||
defer producer.Close()
|
||||
|
||||
// We are not setting a message key, which means that all messages will
|
||||
// be distributed randomly over the different partitions.
|
||||
return producer.SendMessage(&sarama.ProducerMessage{
|
||||
Topic: toTopic,
|
||||
Key: sarama.StringEncoder(key),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
})
|
||||
}
|
||||
|
||||
func postTopicProduceHandler(ctx iris.Context) {
|
||||
topicName := ctx.Params().Get("topic")
|
||||
key := ctx.URLParamDefault("key", "default")
|
||||
|
||||
// read the request data and store them as they are (not recommended in production ofcourse, do your own checks here).
|
||||
body, err := ioutil.ReadAll(ctx.Request().Body)
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusUnprocessableEntity, "unable to read your data: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
partition, offset, err := produceKafkaMessage(topicName, key, body)
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusInternalServerError, "failed to store your data: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// The tuple (topic, partition, offset) can be used as a unique identifier
|
||||
// for a message in a Kafka cluster.
|
||||
ctx.Writef("Your data is stored with unique identifier: %s/%d/%d", topicName, partition, offset)
|
||||
}
|
||||
|
||||
type message struct {
|
||||
Time time.Time `json:"time"`
|
||||
Key string `json:"key"`
|
||||
// Value []byte/json.RawMessage(if you are sure that you are sending only JSON) `json:"value"`
|
||||
// or:
|
||||
Value string `json:"value"` // for simple key-value storage.
|
||||
}
|
||||
|
||||
func getTopicConsumeSSEHandler(ctx iris.Context) {
|
||||
flusher, ok := ctx.ResponseWriter().Flusher()
|
||||
if !ok {
|
||||
ctx.StopWithText(iris.StatusHTTPVersionNotSupported, "streaming unsupported")
|
||||
return
|
||||
}
|
||||
|
||||
ctx.ContentType("application/json, text/event-stream")
|
||||
ctx.Header("Cache-Control", "no-cache")
|
||||
ctx.Header("Connection", "keep-alive")
|
||||
|
||||
master, err := sarama.NewConsumer(brokers, config)
|
||||
if err != nil {
|
||||
fail(ctx, iris.StatusInternalServerError, "unable to start master consumer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fromTopic := ctx.Params().Get("topic")
|
||||
// take the partition, defaults to the first found if not url query parameter "partition" passed.
|
||||
var partition int32
|
||||
partitions, err := master.Partitions(fromTopic)
|
||||
if err != nil {
|
||||
master.Close()
|
||||
fail(ctx, iris.StatusInternalServerError, "unable to get partitions for topic: '%s': %v", fromTopic, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(partitions) > 0 {
|
||||
partition = partitions[0]
|
||||
}
|
||||
|
||||
partition = ctx.URLParamInt32Default("partition", partition)
|
||||
offset := ctx.URLParamInt64Default("offset", sarama.OffsetOldest)
|
||||
|
||||
consumer, err := master.ConsumePartition(fromTopic, partition, offset)
|
||||
if err != nil {
|
||||
ctx.Application().Logger().Error(err)
|
||||
master.Close() // close the master here to avoid any leaks, we will exit.
|
||||
fail(ctx, iris.StatusInternalServerError, "unable to start partition consumer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// `OnClose` fires when the request is finally done (all data read and handler exits) or interrupted by the user.
|
||||
ctx.OnClose(func() {
|
||||
ctx.Application().Logger().Warnf("a client left")
|
||||
|
||||
// Close shuts down the consumer. It must be called after all child
|
||||
// PartitionConsumers have already been closed. <-- That is what
|
||||
// godocs says but it doesn't work like this.
|
||||
// if err = consumer.Close(); err != nil {
|
||||
// ctx.Application().Logger().Errorf("[%s] unable to close partition consumer: %v", ctx.RemoteAddr(), err)
|
||||
// }
|
||||
// so close the master only and omit the first ^ consumer.Close:
|
||||
if err = master.Close(); err != nil {
|
||||
ctx.Application().Logger().Errorf("[%s] unable to close master consumer: %v", ctx.RemoteAddr(), err)
|
||||
}
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case consumerErr, ok := <-consumer.Errors():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ctx.Writef("data: error: {\"reason\": \"%s\"}\n\n", consumerErr.Error())
|
||||
flusher.Flush()
|
||||
case incoming, ok := <-consumer.Messages():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
msg := message{
|
||||
Time: incoming.Timestamp,
|
||||
Key: string(incoming.Key),
|
||||
Value: string(incoming.Value),
|
||||
}
|
||||
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
ctx.Application().Logger().Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
ctx.Writef("data: %s\n\n", b)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user