logstash使用相关


logstash作为中间件,可以打通kafka、RabbitMQ等与Elasticsearch、Redis的信息传递,值得研究。


正文

介绍

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

  • 优势

Logstash 主要的优点就是它的灵活性,主要因为它有很多插件,详细的文档以及直白的配置格式让它可以在多种场景下应用。 我们基本上可以在网上找到很多资源,几乎可以处理任何问题。

  • 劣势

Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升,与它的替代者们相比还是要慢很多的。 这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。

另一个问题是它目前不支持缓存,目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池。

  • 典型应用场景

因为 Logstash 自身的灵活性以及网络上丰富的资料,Logstash 适用于原型验证阶段使用,或者解析非常的复杂的时候。 在不考虑服务器资源的情况下,如果服务器的性能足够好,我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲, 因为文件自身就有缓冲的行为,而 Logstash 也会记住上次处理的位置。

如果服务器性能较差,并不推荐为每个服务器安装 Logstash ,这样就需要一个轻量的日志传输工具, 将数据从服务器端经由一个或多个 Logstash 中心服务器传输到 Elasticsearch。

随着日志项目的推进,可能会因为性能或代价的问题,需要调整日志传输的方式(log shipper)。当判断 Logstash 的性能是否足够好时, 重要的是对吞吐量的需求有着准确的估计,这也决定了需要为 Logstash 投入多少硬件资源。

logstash安装

logstash安装,可以看这里:https://ibaiyang.github.io/blog/linux/2020/04/03/linux-Logstash安装.html

RabbitMQ源

针对队列数据:

{
    "log_id":"5e86e67015791405619",
    "indexname":"demo_logs_dev",
    "time":"2020-04-03 15:32:00",
    "category":"yii\\base\\View::renderFile",
    "level":"trace",
    "step":7,
    "ip_address":"127.0.0.1",
    "msg":"Rendering view file: \/var\/www\/pear-adminlte\/backend\/views\/user\/users-lists.php"
}

看一下从RabbitMQ取数据到Es的logstash配置的例子:

input{
    rabbitmq {
        host=>"ip"                             # 这里填写Rabbitmq的地址,确保可以ping通
        port=> 5672                            # 这里填写Rabbitmq的端口
        user=>"guest"                          # 这里填写Rabbitmq的用户名
        password=>"guest"                      # 这里填写Rabbitmq的密码
        queue=>"logstash"                      # 这里填写Rabbitmq的队列的名称,自定义的
        durable=> true                         # 这里填写Rabbitmq的队列的durable属性
        codec=>json                            # 这里填写Rabbitmq的队列的内容是什么格式
    }
}

output {
    elasticsearch {
        hosts => ["192.168.1.200:9200"]
        index => "%{indexname}-%{+YYYY.MM.dd}"
    }
}

output中的indexname可以是你在信息中自定义的名字,这里也可以确定写一个名字,存入ES就是。

看一个使用的例子:

rabbitmq-logstash.conf:

input {
    rabbitmq {
        exclusive => false
        host => '172.16.5.101'
        user => 'mquser'
        password => 'ABCabc123'
        vhost => '/'
        ack => false
        prefetct_count => 50
        auto_delete => false
        durable => true
        exchange => 'exname'
        key => 'log_routing'
        queue => 'logs'
        threads => 2
    }
}

output {
    elasticsearch {
        hosts => ["http://127.0.0.1:9200"]
        index => "%{indexname}-%{+YYYY.MM.dd}"
    }
}

存到Es中的数据格式:

{
  "_index": "demo_logs_dev-2020.04.03",
  "_type": "_doc",
  "_id": "9nEWQHEBzd1pqLGtXylI",
  "_version": 1,
  "_score": 0,
  "_source": {
    "ip_address": "127.0.0.1",
    "@timestamp": "2020-04-03T12:49:00.093Z",
    "msg": "Rendering view file: /var/www/pear-adminlte/backend/views/user/users-page.php",
    "step": 7,
    "@version": "1",
    "time": "2020-04-03 20:49:00",
    "indexname": "demo_logs_dev",
    "category": "yii\\base\\View::renderFile",
    "level": "trace",
    "log_id": "5e8730bc014ec405091"
  },
  "fields": {
    "@timestamp": [
      "2020-04-03T12:49:00.093Z"
    ]
  }
}

kafka源

kafka-logstash.conf:

input {
    kafka {
        bootstrap_servers => ["192.168.40.001:9092,192.168.40.002:9092,192.168.40.003:9092"]
        client_id => "test"
        group_id => "test"
        #auto_offset_reset => "latest"
        consumer_threads => 6
        #decorate_events => true
        topics => ["logstash"]
        codec => json
    }
}

output {
    if [indexname] =~ "demo_log*" {
        elasticsearch {
            hosts => ["192.168.40.101:9200"]
            index => "%{indexname}-%{+YYYY.MM.dd}"
        }
        elasticsearch {
            hosts => ["192.168.40.102:9200"]
            index => "%{indexname}-%{+YYYY.MM.dd}"
        }
    }
    if [indexname] !~ "demo_log*" {
        elasticsearch {
            hosts => ["192.168.76.103:9200"]
            index => "%{indexname}-%{+YYYY.MM}"
        }
    }
}






参考资料

Rabbitmq通过logstash把queue中的数据保存到ElasticSearch https://blog.csdn.net/u011051912/article/details/81234408

https://www.elastic.co/cn/products/logstash

https://www.elastic.co/guide/en/logstash/current/index.html

https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html

https://blog.csdn.net/u011051912/article/details/81234408

https://www.jianshu.com/p/9097817d3379

https://www.jianshu.com/p/288e9c1db99f

https://www.extlight.com/2017/10/30/Logstash-%E5%9F%BA%E7%A1%80%E5%85%A5%E9%97%A8/

https://www.cnblogs.com/moonlightL/p/7760512.html

https://www.cnblogs.com/DennisXie/p/5785994.html

使用 logstash + kafka + elasticsearch 实现日志监控 https://www.centos.bz/2018/01/%E4%BD%BF%E7%94%A8-logstash-kafka-elasticsearch-%E5%AE%9E%E7%8E%B0%E6%97%A5%E5%BF%97%E7%9B%91%E6%8E%A7/

详解日志采集工具–Logstash、Filebeat、Fluentd、Logagent对比 https://zhuanlan.zhihu.com/p/63725444

Logstash概念与原理、与Flume的对比 https://blog.csdn.net/songfeihu0810232/article/details/94406608


返回