#SpringBootAop、Kafka、ELK收集日志

#SpringBootAop、Kafka、ELK收集日志

2023年7月17日发(作者:)

#SpringBootAop、Kafka、ELK收集⽇志SpringBoot 、Kafka、ELK收集⽇志SpringBoot配置1.引⼊Kafka依赖1234 spring-kafka配置Kafka如下:1112 #kafka配置 spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: key-serializer: Serializer value-serializer: Serializer consumer: group-id: test #Kafka创建的主题 enable-auto-commit: true key-deserializer: Deserializer value-deserializer: Deserializer3.配置完KafKa后启动项⽬,kafkak服务需要启动,需要启动⽣产者和操作者服务SpringAop⽇志模块Boot引⼊Aop的Maven依赖12345 spring-boot-starter-aop 2.定义注解@System如下:1234@Target({, })@Retention(E)public @interface SystemLog {}3.对带有@System注解的⽅法进⾏处理如下:切⾯实现8293637383946474849565758import ject;import dingJoinPoint;import ;import ;import red;import d;import emplate;import ent;import tContextHolder;import tRequestAttributes;import rvletRequest;import DateFormat;import ;@Aspect@Componentpublic class LogInterceptor implements Ordered{ //注⼊KafKa操作类 @Autowired private KafkaTemplate kafkaTemplate; @Around("@annotation(systemLog)") public Object Log(ProceedingJoinPoint joinPoint,SystemLog systemLog){ Object result = null; try { if (joinPoint == null) { return null; } JSONObject message = new JSONObject(); HttpServletRequest request = ((ServletRequestAttributes) uestAttributes()).getRequest(); Date now = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); //可以⽅便地修改⽇期格式 ("time", (now)); //时间 ("requestURL", uestURL().toString()); ("params",ryString()); //参数 result =d(); ("return", result); //返回结果 ("requestMethod", nature().getName()); ("class", get().getClass().getName()); //发送JSONString到Kafka消费者 ("test",String()); n("message:" + String()); } catch (Throwable throwable) { sage(); } return result; } @Override public int getOrder() { return 0; }}4.给接⼝的实现类中的⽅法加上注解1112//Kotlin语⾔写的@SystemLogoverride fun getUserList(req: HttpServletRequest): RetObj { try { val sql = "select * from t_user where 1=1 and is_delete=1" s(req,"获取⽤户列表") return RetObj(true, GetPageList(req, sql, null)) } catch (e: Exception) { tackTrace() return RetObj(false, e) }}5.发送的Kafka消息内容Json如下:1{"requestURL":"localhost:8003/user/getUserList","ip":"127.0.0.1","requestMethod":"getUserList","time":"2020/04/17 22:36:56","class":"aca6.在Kafak的消费者服务中可以看到发来的消息则Kafka消息发送成功,接下来要进⾏同步Kafka中的消息到Es中。Logstash同步Kafka消费者中的消息到sh的conf⽂件下新建⽂件,内容如下:17181920input { kafka { bootstrap_servers => "127.0.0.1:9092" #主题名称 topics => ["test"] }}filter{ json{ source => "message" }}output { elasticsearch { hosts => "127.0.0.1:9200" #Es中的索引为system_log index => "system_log" codec => "json" }}2.启动Elasticsearch3.在logstash安装⽬录下启动Logstash1start /d "F:Logstahlogstash-7.4.0logstash-7.4.0bin" logstash -f F:4.配置Kibana的⽂件,启动Kibana,查看system_log如下

发布者:admin,转转请注明出处:http://www.yc00.com/news/1689545358a264992.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信