Allo,

J'ai entendu parler de ce forum francophone par des européens. Comme j'ai de la misère à trouver des informations sur l'intégration d'Elasticsearch avec SPARK je viens tenter ma chance ici.

J'ai suivis la documentation : http://www.elasticsearch.org/guide/e...ent/spark.html pour me connecter à ES en passant par un job spark. L'écriture de donnée fonctionne très bien.

Mon problème maintenant est de lire ces données.

mon mapping ES ressemble à ca :

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
    "transcript": {
        "properties": {
			"cruiseID": {
                "type": "string"
            },
			"diveID": {
                "type": "string"
            },
            "filename_root": {
                "type": "string"
            },
            "version": {
                "type": "string"
            },
            "id": {
                "type": "string"
            },
            "uuid": {
                "type": "string"
            },
            "status": {
                "type": "string"
            },
			
            "result": {
                "type": "nested",
                "properties": {
                    "begin_time": {
                        "type": "double"
                    },
                    "end_time": {
                        "type": "double"
                    },
                    "confidence": {
                        "type": "double"
                    },
					"location":{
						 "type":"geo_point"
					},
                    "word": {
                        "type": "string"
                    }
                }
            }
        }
    }
}
et mon code à ça :

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
val sc = new SparkContext(ELASTIC_CONF) // new spark context

  val configuration = new Configuration()
  configuration.set("es.nodes", "gtl-retd189")
  configuration.set("es.port", "9200")
  configuration.set("es.resource", resource)
  configuration.set("es.query", query)

  val esRDD = sc.newAPIHadoopRDD(configuration, classOf[EsInputFormat[org.apache.hadoop.io.Text, MapWritable]],classOf[org.apache.hadoop.io.Text], classOf[MapWritable])
  val results = esRDD.map(_._2.get(new org.apache.hadoop.io.Text("result")).toString).take(1)
At this point, j'arrive bien à retrouver des information que j'appellerai "de premier niveau" comme cruiseID , diveID ou uuid ... mais je n'arrive pas à gérer les collections comme "result". Ça à l'air que Elasticsearch me renvoie une RDD[WritableArrayWritable] pour les collections. Et mon problème est que je n'arrive pas à parcourir ce WritableArrayWritable facilement.

Quelqu'un a-t-il déjà utilisé SPARK et elasticsearch ensemble (le tout écrit en scala) ?

au plaisir,

Chris