mysql与es同步的其他方案logstash

mysql与es同步的其他方案logstash

2023年7月17日发(作者:)

mysql与es同步的其他⽅案logstash商品搜索与商品详情1. mysql与es同步的其他⽅案2. laravel封装elasticsearchService1. mysql与es同步的其他⽅案1.1 mysql与es同步的其他⽅案logstash介绍Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash 是⼀个功能强⼤的⼯具,可与各种部署集成。 它提供了⼤量插件,可帮助你解析,丰富,转换和缓冲来⾃各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。应⽤场景1. ⽇志搜索器: logstash采集、处理、转发到elasticsearch存储,在kibana进⾏展⽰2. Elk⽇志分析(elasticsearch+logstash+kibana)3. logstash同步mysql数据库数据到eslogstash安装1. 拉取logstash镜像docker pull logstash:7.12.1(需要与es版本对应)2. 构建logstash容器mkdir /docker/logstash --创建⼀个⽤于存储logstash配置以及插件的⽬录docker run -p 9900:9900 -d --name logstash -v /docker/logstash:/etc/logstash/pipeline --privileged=true logstash:7.12.13. 进⼊容器内部安装 jdbc 和 elasticsearch 插件#进⼊logstash容器内部docker exec -it logstash bash#使⽤logstash-plugin安装器安装logstash-input-jdbc插件,改安装器在bin⽬录下 (此插件镜像新版本⾃带)logstash-plugin install logstash-input-jdbc#安装数据输出到es的插件logstash-plugin install logstash-output-elasticsearch4. 下载jdbc的包/maven2/mysql/mysql-connector-java/8.0.24/5. 修改容器内部配置vi config/ 更改⽂件内容如下:: "0.0.0.0": [ "172.17.0.7:9200" ]vi config/ 更改⽂件内容如下:- : table1 : "/etc/logstash/pipeline/"需要注意的是⾃⼰⽬录是不是存在这些⽂件,不要找错地⽅了,⼀般进⼊容器就会是logstash的安装⽬录,ls查看就能够看到config⽬录的6. 退出容器,配置⽂件创建与编辑 (此处属于全量的配置⽂件)touch /docker/logstash/put { stdin { } jdbc { #注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等 jdbc_connection_string => "jdbc:mysql://172.17.0.4:3306/lmrs_2008_shops" jdbc_user => "root" jdbc_password => "root" #这个jar包的地址是容器内的地址 jdbc_driver_library => "/etc/logstash/pipeline/" jdbc_driver_class => "" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select a.`name`,_name,_id,_category_id as category_id,_id,,,_count,_count,_time,_time,b.`name` as category, from lmrs_products as a LEFT JOIN schedule => "* * * * *" } } output { elasticsearch { #注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等 hosts => "172.17.0.7:9200" index => "products" document_type => "_doc" document_id => "_id" } stdout { codec => json_lines }}增量配置⽂件如下input { stdin { } jdbc { #注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等 jdbc_connection_string => "jdbc:mysql://192.168.63.1:3306/starsky" jdbc_user => "starsky" jdbc_password => "root" #数据库重连尝试 connection_retry_attempts => "3" #数据库连接可⽤校验超时时间,默认为3600s jdbc_validation_timeout => "3600" #这个jar包的地址是容器内的地址 jdbc_driver_library => "/etc/logstash/pipeline/" jdbc_driver_class => "" #开启分页查询(默认是false) jdbc_paging_enabled => "true" #单次分页查询条数(默认100000,字段较多的话,可以适当调整这个数值) jdbc_page_size => "50000" #执⾏的sql语句 statement => "SELECT ,a.`name`,_name,_id,_category_id AS category_id,_id,,a.`STATUS`,_count,_count,_time,_time FROM lmrs_products AS a where > :sql_last_val #需要记录查询结果某字段的值时,此字段为true,否则默认tracking_colum为timestamp的值 use_column_value => true #是否将字段名转为⼩写,默认为true(如果具备序列化或者反序列化,建议设置为false) lowercase_column_names => false #需要记录的字段,同于增量同步,需要是数据库字段 tracking_column => id #记录字段的数据类型 tracking_column_type => numeric #上次数据存放位置 record_last_run => true #上⼀个sql_last_value的存放路径,必须在⽂件中指定字段的初始值 last_run_metadata_path => "/etc/logstash/pipeline/" #是否清除last_run_metadata_path的记录,需要增量同步这个字段的值必须为false clean_run => false #同步的频率(分 时 天 ⽉ 年)默认为每分钟同步⼀次 schedule => "* * * * *" } } output { elasticsearch { #注意mysql连接地址⼀定要⽤ip,不能使⽤localhost等 hosts => "172.17.0.7:9200" index => "products" document_type => "_doc" document_id => "%{id}" } stdout { codec => json_lines }}last_run_metadata_path => "/etc/logstash/pipeline/":因为需要记录下上次同步的数据id,所以这⾥会有⼀个⽂件进⾏存储这个id,需要在logstash⽬录下去创建⼀个txt⽂件,⽤于存储这个id,同时需要给予权限。不给会出现权限异常问题2. laravel封装elasticsearchServicees中的商品索引信息如下:PUT /products/{ "mappings": { "properties": { "name":{ "type": "text", "analyzer": "ik_smart" }, "long_name":{ "type": "text", "analyzer": "ik_smart" }, "brand_id":{ "type": "integer" }, "category_id":{ "type":"integer" }, "category":{ "type": "keyword" }, "category_path":{ "type": "keyword" }, "shop_id":{ "type":"integer" }, "price":{ "type":"scaled_float", "scaling_factor":100 }, "sold_count":{ "type":"integer" }, "review_count":{ "type":"integer" }, "status":{ "type":"integer" }, "create_time" : { "type" : "date" }, "last_time" : { "type" : "date" } } }}1. 创建⼀个Service,⽤于提供控制器使⽤es服务php artisan make:service /Service/ 'products',//索引 'type' => '_doc',//类型 'body' => [ "query" => [ "bool" => [ 'filter' => [], 'must' => [] ] ] ] ]; /** * @param $size 数据量 * @param $page 索引起始 * @return $this * 搜索分页构建 */ public function paginate($size,$page) { $this->params['body']['from'] = ($page - 1) * $size; $this->params['body']['size'] = $size; return $this; } /** * @return $this * 判断商品是否已上架并经过审核 */ public function IsStatus() { $this->params['body']['query']['bool']['filter'][] = ['term' => ['status' => 1]]; return $this; } /** * @param ProductCategory $category ⽤户传递过来的分类对象或者id * @return $this * 分类筛选 */ public function category(ProductCategory $category) { if ($category->is_directory){ $this->params['body']['query']['bool']['filter'] = [ 'prefix' => ['category_path' => $category->path.$category->id.'-'] ]; }else{ $this->params['body']['query']['bool']['filter'][] = ['term' => ['category_id' => $category->id]]; } return $this; } /** * @param $keywords 关键词数组 * @return $this * 关键词按照权重进⾏搜索 */ public function keywords($keywords) { //如果不是数组需要转为数组 $keywords = is_array($keywords) ? $keywords : [$keywords]; foreach ($keywords as $keyword){ $this->params['body']['query']['bool']['must'][] = [ 'multi_match' => [ 'query' => $keyword, 'fields' => [ 'long_name^3', 'category^2' ] ] ]; } return $this; } /* * 排序 */ public function orderBy($filed,$direction) { if (!isset($this->params['body']['sort'])){ $this->params['body']['sort'] = []; } $this->params['body']['sort'][] = [$filed => $direction]; return $this; } /* * 返回结构体 */ public function getParams() { return $this->params; }}>app/Http/Controllers/Api/V1/input('page',1); //分页的数据数量 $perPage = 20; //调⽤es封装类,增加商品状态为上架条件与分页查询结构 $builder = (new ElasticsearchService())->IsStatus()->paginate($perPage,$page); //分类搜索 if ($request->input('category_id') && $category = $this->category($request->input('category_id')) ){ $builder->category($category); } //具备关键词,按照关键词进⾏搜索(可以是多个) if ($search = $request->input('search','')){ $keywords = array_filter(explode(' ',$search)); $builder->keywords($keywords); } //根据销量,价格,评论数量进⾏排序 if ($order = $request->input('order','')){ if (preg_match('/^(.+)_(asc|desc)$/',$order,$m)){ if (in_array($m[1],['price','sold_count','review_count'])){ $builder->orderBy($m[1],$m[2]); } } } //通过容器注册的es单例调⽤search⽅法到es搜索数据,条件为上⾯构建的结构体 $restful = app('es')->search($builder->getParams()); return response()->json([ "data" => $restful ]); } /** * [category description] * @method category * @param {[type]} $category [description] * @return {[type]} [description] * 查询分类数据 */ public function category($category) { $category_array = explode(',',$category); $category_id = array_pop($category_array); return ProductCategory::query()->where('id',$category_id)->first(); }}>

发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1689539674a264492.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信