Bonjour à tous,
Je suis débutant en spark streaming et je suis en train de faire quelques tests. J'envoi côté serveur des tweets récupérés via l'API Twitter et de l'autre côté, le client effectue un traitement dessus. Le problème est que je ne reçois pas instantanément les données côté client pour effectuer ce traitement. Avec un netcat (nc -lk localhost 5555) cela fonctionne mais avec ce que j'ai là aucun traitement ne se fait..
Serveur:
Client :
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 class TweetsListener(StreamListener): def __init__(self, csocket): self.client_socket = csocket def on_data(self, data): try: msg = json.loads( data ) print( msg['text'].encode('utf-8') ) self.client_socket.send( msg['text'].encode('utf-8') ) print ("c'est bon") return True except BaseException as e: print("Error on_data: %s" % str(e)) return True def on_error(self, status): print(status) return True def sendData(c_socket): auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_secret) twitter_stream = Stream(auth, TweetsListener(c_socket)) twitter_stream.userstream(_with="user") if __name__ == "__main__": s = socket.socket() # Create a socket object host = "localhost" # Get local machine name port = 5555 # Reserve a port for your service. s.bind((host, port)) # Bind to the port print("Listening on port: %s" % str(port)) s.listen(5) # Now wait for client connection. c, addr = s.accept() # Establish connection with client. print( "Received request from: " + str( addr ) ) sendData(c)
Comment puis-je régler ce problème?
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 sc = SparkContext(appName="Test") ssc = StreamingContext(sc, 10) socket_stream = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) #Creation window with the data in RDD created -- RDD data every 20 sec lines = socket_stream.window(20) print(lines) counts = (lines.flatMap(lambda line: line.split(" "))\ .filter(lambda word: word.lower().startswith("#")) \ .map(lambda word: (word.lower(),1)) \ .reduceByKey(lambda a, b: a+b)) counts.pprint() ssc.start() ssc.awaitTermination()
Merci à vous.
Partager