ES与MySQL数据同步(全量与增量)

ES与MySQL数据同步(全量与增量)

2023年7月17日发(作者:)

ES与MySQL数据同步(全量与增量)思考:

1. 考虑ES如何与数据库实现同步?2. ES如何查询多个字段?3. 如何构建商品服务(包含搜索功能)?4. ES肯定是集群的,如何集群?5. ⼀个项⽬当它做⼤做当后都可能会需要将数据从传统的数据库同步到另⼀种数据集合中,⼀般⽤于提⾼查询效率或将数据进⾏备份的⽬的。其中⽐较常见的⼀种同步⽅式是从关系型数据库同步到es6. MQ与logstash实现ES与数据库同步区别7. 基于Docker⽅式实现Elasticsearch集群8. 整合Elasticsearch IK分词器⼀、理论基础1.1、es与数据库是如何保持⼀致的呢?原理:1.2、MySQL与ES实时同步常⽤插件参考:1.3、logstash-input-jdbc同步插件原理:作⽤:使⽤ logstash-input-jdbc 插件读取 mysql 的数据如何安装logstash-input-jdbc插件:原理:定时执⾏⼀个 sql,然后将 sql 执⾏的结果写⼊到流中,增量获取的⽅式没有通过 binlog ⽅式同步,⽽是⽤⼀个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询⽐当前⼤的记录即可获取这段时间内的全部增量,⼀般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适⽤于那种只有插⼊没有更新的表,update_time 更加通⽤⼀些,建议在 mysql 表设计的时候都增加⼀个 update_time 字段1.4、什么是全量同步和增量同步(1)全量同步什么是全量同步:将⼀个mysql的整个表的所有数据都同步到es中常⽤插件是logstash-input-jdbc,logstash通过sql语句分区间对数据进⾏查询,然后输出到es进⾏实现。(2)增量同步(canal)什么是增量同步:业务场景需要实时性较⾼并对要求对数据库的压⼒⽐较⼩。logstash不再适合增量同步,logstash是基于sql来完成的,并且是通过cron表达式来调⽤。阿⾥在做⼤做强的过程中也遇到过类似的问题,为了达到性能的最优,他们⾃⼰通过JAVA代码实现了mysql的数据同步功能,通过解析mysql 的⽇志进⾏实现的,并且把这个项⽬开源——canalcanal地址:原理:简单看了下代码,它⾥⾯是通过⼀个server和⼀个client来进⾏实现的,及server负责mysql的⽇志监听与收集,然后再传给client端,server与client端使⽤的netty进⾏数据的tcp通信,现在也有了kafka和rocketMQ作为通信渠道的版本。同时canal也有专门的可视化监控界⾯,⽅便进⾏查看,同时也有HA(hadoop集群)的实现⽅式,通过zookeeper来进⾏实现的。使⽤:想将数据从mysql同步到es通过canal的⼀个adapter就可以直接实现了参考:canal优点:(1)canal实现数据的增量同步性能⾼,有可视化界⾯可监控(2)看canal esAdapter的源码可知,canal esAdater中的etl⽅法对全量同步的功能也已经做了实现,可增量可全量同步了

1.5、MQ与logstash实现ES与数据库同步区别Logstash实现ES与数据库同步:使⽤定时器⽅式 、实现简单MQ实现ES与数据库同步:实时性、复杂性更⾼、⼀致性强1.6、拼⾳分词器elasticsearch-analysis-pinyin略⼆、实战2.1、商品服务搜索如何实现商品服务搜索采⽤ElasticSearch实现需要考虑什么问题?(1)如何实现ES与mysql数据库同步A、使⽤logstash 把mysql 同步到 elasticsearch什么是logstash?Logstash是⼀个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来⾃不同数据源的数据统⼀起来,并将数据标准化到你所选择的⽬的地1.上传到服务中 –zxvf logstash-6.4.3

4. bin/logstash-plugin install logstash-input-jdbc5. bin/logstash-plugin install logstash-output-elasticsearch相关配置⽂件说明:jdbc_driver_library: jdbc mysql 驱动的路径,在上⼀步中已经下载jdbc_driver_class: 驱动类的名字,mysql 填 就好了jdbc_connection_string: mysql 地址jdbc_user: mysql ⽤户jdbc_password: mysql 密码schedule: 执⾏ sql 时机,类似 crontab 的调度use_column_value: 使⽤递增列的值tracking_column_type: 递增字段的类型,numeric 表⽰数值类型, timestamp 表⽰时间戳类型tracking_column: 递增字段的名称,这⾥使⽤ update_time 这⼀列,这列的类型是 timestamplast_run_metadata_path: 同步点⽂件,这个⽂件记录了上次的同步点,重启时会读取这个⽂件,这个⽂件可以⼿动statement: 要执⾏的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这⾥的 sql_last_value 是内置的变量,表⽰上⼀次 sql 执⾏中 update_time 的B、多⽂件⽅式同步ES数据⼀个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置⽂件就可以了案例:假设我们有两个表 table1 和 table2,对应两个配置⽂件 sync_ 和 sync_在 config/ 中配置:- : table1 : "config/sync_"- : table2 : "config/sync_"./bin/logstash(2)构建商品服务信息接⼝商品搜索服务接⼝:@GetMapping("/search") public BaseResponse> search(String productName);@Autowired private ProductReposiory productReposiory; @Override public BaseResponse> search(String productName) { BoolQueryBuilder builder = ery(); // 设置模糊搜索 (uery("name", productName)); List search = (List) (builder); List listReuslt = new ArrayList<>(); operties(search, listReuslt); return setResultSuccess(listReuslt); }public interface ProductReposiory extends ElasticsearchRepository {}@SpringBootApplication@EnableElasticsearchRepositories(basePackages = { "" })public class AppProduct { public static void main(String[] args) { (, args); }}配置⽂件:###服务启动端⼝号server: port: 8500###服务名称(服务注册到eureka名称)

eureka: client: service-url: defaultZone: localhost:8100/eureka

spring: application: name: app-mayikt-goods redis: host: 188.131.155.46 port: 6379 password: 123456 pool: max-idle: 100 min-idle: 1 max-active: 1000 max-wait: -1###数据库相关连接

datasource: username: root password: root driver-class-name: url: jdbc:mysql://127.0.0.1:3306/goods?useUnicode=true&characterEncoding=UTF- data: elasticsearch: ####集群名称 cluster-name: myes ####地址

cluster-nodes: 192.168.212.247:9300

2.2、基于Docker⽅式实现Elasticsearch集群注意:spring-boot-starter-data-elasticsearch必须为集群⽅式连接,否则情况下会报⼀下错误None of the configured nodes are available(1) -p es/ data3firewall-cmd --add-port=9300/tcpfirewall-cmd --add-port=9301/tcp6. mkdir plugins17. mkdir plugins2(2)在es/config分别放⼊、:Es1:: : _host: h_host: : : d: -origin: "*": true

: true

: ["192.168.212.252:9300","192.168.212.252:9301"]m_master_nodes: 1Es2:: : _host: h_host: : : d: -origin: "*": true

: true

: ["192.168.212.252:9300","192.168.212.252:9301"]m_master_nodes: 1(3)启动俩容器启动容器1:docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9200:9200 -p 9300:9300 -p 5601:5601 -v /usr/local/es/config/:/usr/share/elasticsear启动容器2:docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9201:9201 -p 9301:9301 -v /usr/local/es/config/:/usr/share/elasticsearch/config/elast(4)测试集群效果

2.3、SpringBoot整合Elasticsearch(1)创建服务项⽬shop-service-goods服务接⼝(2)创建商品搜索服务接⼝public interface ProductSearchService {@GetMapping("/search") public BaseResponse> search(String name);}(3)dto实体类@Datapublic class ProductDto { /** 主键ID */ private Integer id; /** 类型ID */ private Integer categoryId; /** 名称 */ private String name; /** ⼩标题 */ private String subtitle; /** 主图像 */ private String mainImage; /** ⼩标题图像 */ private String subImages; /** 描述 */ private String detail; /** 商品规格 */ private String attributeList; /** 价格 */ private Double price; /** 库存 */ private Integer stock; /** 状态 */ private Integer status; /** 乐观锁 */ private Integer revision; /** 创建⼈ */ private String createdBy; /** 创建时间 */ private Date createdTime; /** 更新⼈ */ private String updatedBy; /** 更新时间 */ private Timestamp updatedTime;}(4)maven依赖 spring-boot-starter-data-elasticsearch sl querydsl-apt sl querydsl-jpa orika-core 1.5.2 (5)业务实现@RestControllerpublic class ProductSearchServiceImpl extends BaseApiService> implements ProductSearchService { @Autowired private ProductReposiory productReposiory; @Override public BaseResponse> search(String name) {//1.拼接查询条件 BoolQueryBuilder builder = ery();//2.模拟查询name字段 (uery("name", name)); Pageable pageable = new QPageRequest(0, 5);//3.调⽤ES接⼝查询 Page page = (builder, pageable);//4.获取集合数据 List content = tent(); MapperFactory mapperFactory = new r().build();//5.将entity转换为dto List mapAsList = perFacade().mapAsList(content, ); return setResultSuccess(mapAsList); }}public interface ProductReposiory extends ElasticsearchRepository {}(6)实体@Document(indexName = "goods", type = "goods")@Datapublic class ProductEntity { /** 主键ID */ private Integer id; /** 类型ID */ private Integer categoryId; /** 名称 */ private String name; /** ⼩标题 */ private String subtitle; /** 主图像 */ private String mainImage; /** ⼩标题图像 */ private String subImages; /** 描述 */ private String detail; /** 商品规格 */ private String attributeList; /** 价格 */ private Double price; /** 库存 */ private Integer stock; /** 状态 */ private Integer status; /** 创建⼈ */ private String createdBy; /** 创建时间 */ private Date createdTime; /** 更新时间 */ private Timestamp updatedTime;}(7)配置⽂件###服务启动端⼝号server: port: 8500###服务名称(服务注册到eureka名称)

eureka: client: service-url: defaultZone: localhost:8100/eureka

spring: application: name: app-goods redis: host: redisIP地址 port: 6379 password: 123456 pool: max-idle: 100 min-idle: 1 max-active: 1000 max-wait: -1###数据库相关连接

datasource: username: root password: root driver-class-name: url: jdbc:mysql://127.0.0.1:3306/goods?useUnicode=true&characterEncoding=UTF- data: elasticsearch: ####集群名称 cluster-name: elasticsearch-cluster ####地址

cluster-nodes: IP地址:93002.4、Elasticsearch集成IK分词器如果没有整合IK分词,es⽆法⽀持模糊查询(1)ES⽂档类型映射GET /goods/_mappingDELETE /goodsPUT /goodsPOST /goods/_mapping/goods{ "goods": { "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "attribute_list": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "category_id": { "type": "long" }, "created_time": { "type": "date" }, "detail": { "type": "text", "analyzer":"ik_smart", "search_analyzer":"ik_smart" }, "id": { "type": "long" }, "main_image": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "name": { "type": "text", "analyzer":"ik_smart", "search_analyzer":"ik_smart" }, "revision": { "type": "long" }, "status": { "type": "long" }, "sub_images": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subtitle": { "type": "text", "analyzer":"ik_smart", "search_analyzer":"ik_smart" }, "updated_time": { "type": "date" } } }}(2){ "analyzer": "ik_smart", "text": "苹果"}{ "analyzer": "standard", "text": "奥迪a4l"}

发布者:admin,转转请注明出处:http://www.yc00.com/web/1689534215a263959.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信