透過kafka搜集大資料

Kafka是一套開源的pubsub系統,可以用來作為資料暫留的佇列,作為大資料搜集的工具是再好不過,fluentd的kafka plugin專案可以在這邊找到: https://github.com/htgc/fluent-plugin-kafka。

Installation

一般在本機安裝,可以透過ruby的gem來安裝fluentd kafka的plugin..

gem install fluent-plugin-kafka

Build your docker

在Docker的環境,我們可以用下面的Dockerfile配置來準備執行環境...

Dockerfile:

FROM ruby:2.2.0
MAINTAINER [email protected]
RUN apt-get update
RUN gem install fluentd -v "~>0.12.3"
RUN mkdir /etc/fluent
RUN apt-get install -y libcurl4-gnutls-dev make
RUN /usr/local/bin/gem install fluent-plugin-kafka && /usr/local/bin/gem install fluent-plugin-secure-forward && \
  /usr/local/bundle/bin/secure-forward-ca-generate /tmp/ notasecret
#RUN /usr/local/bin/gem install fluent-plugin-elasticsearch
ADD fluent.conf /etc/fluent/
ENTRYPOINT ["/usr/local/bundle/bin/fluentd", "-c", "/etc/fluent/fluent.conf"]

Build:

docker build -t linkeriot/fluentd .

Run a sample

在執行前,需要有fluentd的設定檔... 下面準備一個可以接受http input與stdout + kafka output的設定檔...

fluent.conf:

<source>
  type http
  port 9880
</source>

<match *.**>
  type copy
  <store>
    type stdout
  </store>
  <store>
    @type kafka
    @id kafka_output
    brokers your-ip-address:your-port
    output_data_type json
    default_topic iot
  </store>
</match>

在設定檔中,指定kafka的topic為iot,且格式為json。

Run:

docker run -it -p 9880:9880 -v $(pwd)/fluent.conf:/fluent.conf linkeriot/fluentd bash

上面的執行部分,由於需要測試http input,因此需要打開9880 port對外。如果您的fluent.conf未包入image中,則可以透過-v掛載該設定檔到image中...

最後,由於已經export 9880 port,因此可以透過curl來輸入資料...

curl -X POST -d 'json={"action":"login","user":423}' http://your-gw-ip:9880/iot.message.aaa

最後,我們可以透過下面的指令來確認kafka有收到資料與否...

kafkacat -C -b "52.79.124.228:8083" -t iot -p 0 -e

results matching ""

    No results matching ""