Bonjour à tous,

Je suis débutant en spark streaming et je suis en train de faire quelques tests. J'envoi côté serveur des tweets récupérés via l'API Twitter et de l'autre côté, le client effectue un traitement dessus. Le problème est que je ne reçois pas instantanément les données côté client pour effectuer ce traitement. Avec un netcat (nc -lk localhost 5555) cela fonctionne mais avec ce que j'ai là aucun traitement ne se fait..

Serveur:

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class TweetsListener(StreamListener):
 
  def __init__(self, csocket):
      self.client_socket = csocket
 
 
  def on_data(self, data):
      try:
          msg = json.loads( data )
          print( msg['text'].encode('utf-8') )
          self.client_socket.send( msg['text'].encode('utf-8') )
          print ("c'est bon")
          return True
      except BaseException as e:
          print("Error on_data: %s" % str(e))
      return True
 
  def on_error(self, status):
      print(status)
      return True
 
def sendData(c_socket):
  auth = OAuthHandler(consumer_key, consumer_secret)
  auth.set_access_token(access_token, access_secret)
 
  twitter_stream = Stream(auth, TweetsListener(c_socket))
  twitter_stream.userstream(_with="user")
 
if __name__ == "__main__":
  s = socket.socket()         # Create a socket object
  host = "localhost"      # Get local machine name
  port = 5555                # Reserve a port for your service.
  s.bind((host, port))        # Bind to the port
 
  print("Listening on port: %s" % str(port))
 
  s.listen(5)                 # Now wait for client connection.
  c, addr = s.accept()        # Establish connection with client.
 
  print( "Received request from: " + str( addr ) )
 
  sendData(c)
Client :

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sc = SparkContext(appName="Test")
ssc = StreamingContext(sc, 10)
 
socket_stream = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
 
 
 
#Creation window with the data in RDD created -- RDD data every 20 sec
lines = socket_stream.window(20)
 
print(lines)
 
counts = (lines.flatMap(lambda line: line.split(" "))\
                  .filter(lambda word: word.lower().startswith("#")) \
  				  .map(lambda word: (word.lower(),1)) \
                  .reduceByKey(lambda a, b: a+b))
 
 
 
counts.pprint()
 
ssc.start()
ssc.awaitTermination()
Comment puis-je régler ce problème?

Merci à vous.