Elasticsearch之空间数据

  • A+
所属分类:大数据GIS

空间字段

针对空间数据建立空间索引,Elasticsearch会自动为每个字段自动建立索引,空间字段也是如此,Elasticsearch已经提供了几种类型的空间字段,下面主要针对空间字段进行讲解。空间字段主要分为两种类型,geo_point和geo_shape。ps,作者对于Elasticsearch研究的还不是很透彻,如果各位发现哪里有错误,欢迎在评论区留言。

geo_point

geo_point是为点要素专门设定的字段,字段内容填点要素的经纬度,该字段可以使用Elasticsearch以下几个功能。

  • 找出落在指定矩形框中的坐标点、找出与指定点在给定距离内的点、找出与指定点距离在给定最小距离和最大距离之间的点、找出落在多边形中的点。
  • 从地理位置角度或者依据距离中心点的距离进行空间聚合。
  • 按距离排序
  • 将距离整合到记录的相关性得分中

geo_shape

当数据中包含点要素以外的线、面等要素时,需要利用geo_shape类型。该字段将geo_json几何对象映射到geo_shape类型,如果要启用该类型,必须将某个字段定义为geo_shape类型。geo_shape有两种构建索引的方式,分别是geohash索引和四叉树索引,同时需要设置索引的建立精度。

定义mapping

采用如下语句定义mapping,其中包含了geo_shape和geo_point。本文主要对浮动车数据建立数据表,因此,geo_point主要存储起止点数据,点要素格式;geo_shape保存轨迹数据,线要素格式。

{
	"settings":{
		"number_of_shards": 3,
		"number_of_replicas": 1
	},
	"mappings":{
		"Trace":{
			"properties":{
				
				"id":{
					"type":"keyword"
				},
				
				"taxiId":{
					"type":"keyword"
				},
				
				"startTime":{
					"type":"date",
					"format":"yyyy-MM-dd HH:mm:ss"
				},
				
				"startTimeIndex":{
					"type":"integer"
				},
				
				"endTime":{
					"type":"date",
					"format":"yyyy-MM-dd HH:mm:ss"
				},
				
				"endTimeIndex":{
					"type":"integer"
				},
				
				"averageSpeed":{
					"type":"integer"
				},
				
				"length":{
					"type":"double"
				},
				
				"emptyOrFillStatus":{
					"type":"integer"
				},
				
				"startPoint": {
					"type": "geo_point"
				},
				
				"endPoint": {
					"type": "geo_point"
				},
				
				"pointList": {
					"type": "geo_shape",
					"tree":"geohash",
					"precision": "10km"
				}
			}
		}	
	}
}

数据库结构图

Elasticsearch之空间数据

插入数据后,格式如下,可以发现,空间字段并没有显示出来,但实际上,数据已经保存在数据库中。

Elasticsearch之空间数据

插入数据

本文插入数据的代码采用java代码编写,主要讲解如何用java插入geo_point和geo_shape类型的数据,网上基本没有对这种方法的讲解,亲测有效。

Map<String,Object> jsonMap = new HashMap<String,Object>();
//创建一个map,用于存储数据,每一个jsonMap即为一条记录,string为字段名称,object为数据封装对象
double[] endPoint = {Double.parseDouble(lon_lat_end[0]),Double.parseDouble(lon_lat_end[1])};
//一维double数组,第一位为经度,第二位为纬度
jsonMap.put("endPoint",endPoint);
//将数据加入map中
double[][] ptsArray = new double[ptList.size()][2];
//ptList为轨迹数据列表,列表中每个元素为一个轨迹点,用经度+“,”+纬度形式表示
//采用二维数组表示轨迹数据
for (int i=0;i<ptList.size();i++) {
     String ptstr = ptList.get(i);
     String[] lon_lat = ptstr.split(",");
     ptsArray[i][0] = Double.parseDouble(lon_lat[0]);//经度
     ptsArray[i][1] = Double.parseDouble(lon_lat[1]);//纬度
}
Map<String,Object> pointlistMap = new HashMap<String,Object>();
//专门为geoshape类型定义map,主要是因为在插入记录时必须指定geoshape类型,如这里使用的是“lingestring”
pointlistMap.put("type","linestring");
pointlistMap.put("coordinates",ptsArray);
jsonMap.put("pointList",pointlistMap);//将数据加入结果map中
List<Map<String, Object>> dataList = new ArrayList<Map<String, Object>>();
//定义一个存放jsonMap的list,每个jsonMap即代表一条数据记录
List idList = new ArrayList();//这里指定每条记录的id,不使用系统自动生成的id
dataList.add(jsonMap);//使用for循环,将所有的jsonmap都加入到dataList中
idList.add(id);
TransportClient client = EsClient.getConnection();//获取Elasticsearch的连接
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
//一定要采用批处理,极大程度加快插入速度,我这里1s可以插入10000条
//批处理一次性处理的数量可以自己定,主要是在下面的for循环外在加一层for循环进行批处理控制
for (int i = 0; i < dataList.size(); i++) {
    bulkRequestBuilder.add(client.prepareIndex("20170307trace", "Trace",idList.get(i)).setSource(dataList.get(i)));
}
bulkRequestBuilder.get();

Elasticsearch之空间数据

空间聚合查询

这里主要讲解空间聚合查询,使用java语言,将标识为空间类型的字段进行空间聚合

public List<double[]> searchUpHeatPoints(String ptLeftTop, String ptRightBottom, String startTime, String endTime) {
    long startFuncTime = System.currentTimeMillis();
    TransportClient client = EsClient.getConnection();
    
    //查询条件
    QueryBuilder queryBuilder = QueryBuilders.boolQuery()
            .must(QueryBuilders.termQuery("emptyOrFillStatus", 1))
            .must(QueryBuilders.rangeQuery("startTime").from("2017-03-07 17:00:00").to("2017-03-07 19:00:00"));
    
    //聚合条件
    AggregationBuilder aggregation =
            AggregationBuilders
                    .geohashGrid("agg")
                    .field("startPoint")
                    .precision(7);

    //定义查询、加入条件,并开启查询
    SearchResponse response = client.prepareSearch("20170307trace")
            .setTypes("Trace")
            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
            .setQuery(queryBuilder)                 // Query
            .addAggregation(aggregation)
            //.setSize(100)//这里不要设置size,聚合查询不要设置size
            .get();
    System.out.println(response.getHits().getTotalHits() + "个");// !总! 记录数,总记录数

    List<double[]> result = new ArrayList<double[]>();
    //遍历聚合结果
    GeoHashGrid agg = response.getAggregations().get("agg");
    for ( GeoHashGrid.Bucket entry : agg.getBuckets()) {
        //String keyAsString = entry.getKeyAsString(); // key as String
        GeoPoint key = (GeoPoint) entry.getKey();    // key as geo point
        long docCount = entry.getDocCount();         // Doc count
        double[] bd09 = CoordinateTransformUtil.wgs84tobd09(key.getLon(), key.getLat());
        //这里做了个wgs-84到百度坐标的转换,可以不用管
        result.add(new double[]{bd09[0], bd09[1], docCount});
    }
    long endFuncTime = System.currentTimeMillis();    //获取结束时间
    System.out.println("运行耗时"+(endFuncTime - startFuncTime)/1000.0 + "秒"); 
    //表中记录12万条,聚合的结果集近20000个,耗时不到200毫秒;
    //依我估测,哪怕数据量为千万条,耗时应该也在1s内,个人感觉,es对聚合的支持非常非常棒,very good!!!
    return result;//接下来就可以用百度地图做热力图了,非常方便
}

聚合结果如下图所示。

Elasticsearch之空间数据

  • 我的微信
  • weinxin
  • 微信公众号
  • weinxin
阿拉灯aladeng

发表评论

您必须才能发表评论!