Allo,
J'ai entendu parler de ce forum francophone par des européens. Comme j'ai de la misère à trouver des informations sur l'intégration d'Elasticsearch avec SPARK je viens tenter ma chance ici.
J'ai suivis la documentation : http://www.elasticsearch.org/guide/e...ent/spark.html pour me connecter à ES en passant par un job spark. L'écriture de donnée fonctionne très bien.
Mon problème maintenant est de lire ces données.
mon mapping ES ressemble à ca :
et mon code à ça :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 { "transcript": { "properties": { "cruiseID": { "type": "string" }, "diveID": { "type": "string" }, "filename_root": { "type": "string" }, "version": { "type": "string" }, "id": { "type": "string" }, "uuid": { "type": "string" }, "status": { "type": "string" }, "result": { "type": "nested", "properties": { "begin_time": { "type": "double" }, "end_time": { "type": "double" }, "confidence": { "type": "double" }, "location":{ "type":"geo_point" }, "word": { "type": "string" } } } } } }
At this point, j'arrive bien à retrouver des information que j'appellerai "de premier niveau" comme cruiseID , diveID ou uuid ... mais je n'arrive pas à gérer les collections comme "result". Ça à l'air que Elasticsearch me renvoie une RDD[WritableArrayWritable] pour les collections. Et mon problème est que je n'arrive pas à parcourir ce WritableArrayWritable facilement.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10 val sc = new SparkContext(ELASTIC_CONF) // new spark context val configuration = new Configuration() configuration.set("es.nodes", "gtl-retd189") configuration.set("es.port", "9200") configuration.set("es.resource", resource) configuration.set("es.query", query) val esRDD = sc.newAPIHadoopRDD(configuration, classOf[EsInputFormat[org.apache.hadoop.io.Text, MapWritable]],classOf[org.apache.hadoop.io.Text], classOf[MapWritable]) val results = esRDD.map(_._2.get(new org.apache.hadoop.io.Text("result")).toString).take(1)
Quelqu'un a-t-il déjà utilisé SPARK et elasticsearch ensemble (le tout écrit en scala) ?
au plaisir,
Chris
Partager