Bonjour à vous,

J'ai lus tous les tutoriaux possibles, je comprend toujours pas CELERY...
J'ai tellement de questions.. êtes-vous prêt ?

J'utilisais mon propre serveur RabbitMQ pour les tests , jusqu'à ce que je vois https://www.cloudamqp.com/

Avec Pika, je suis capable de créer des échanges direct, topic, fanout..
Avec Kombu aussi. ( Celery se sert de Kombu )

Mais celery? dois-je passer par kombu? pika?
Pour dans les documentations, ils disent : CELERY_QUEUES et CELERY_ROUTING

"Celery received 'topic' but current is 'direct'

Voici un petit example simple:

tasks.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 
#tasks.py
from celery import Celery
from celery.task import task
 
THEHOST = 'amqp://the_address_to_server'
 
app = Celery('tasks', broker=THEHOST)
 
@app.task(name="add", queue="add")
def add(x, y):
  return x + y
 
@app.task(name="multi", queue="multi")
def multi(x):
  return x * x
 
@app.task(name="autre", queue="autre")
def autre(x):
  return x + x * x / 2
celeryconfig.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 
#celeryconfig.py
BROKER_URL = "the_address_to_server"
CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
CELERY_RESULT_BACKEND = 'amqp'
CELERY_IMPORTS = ('tasks', )
 
CELERY_DEFAULT_QUEUE = "default"
 
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" # sert à rien Tout sort en DIRECT..
CELERY_DEFAULT_ROUTING_KEY = "task.default"
 
#Ensuite j'ai tellement jouer avec les paramètres. 
#Es-ce correct? J'ai l'impression de me répéter.
#Pourtant ils sortent tous en DIRECT quand même.
 
default_exchange = Exchange("default", type='direct')
normal_exchange = Exchange("normal", type='topic')
special_exchange = Exchange("special", type='topic')
 
CELERY_QUEUES = {
  Queue("default", routing_key="default", default_exchange),
  Queue("add", routing_key="add", normal_exchange),
  Queue("multi", routing_key="multi", special_exchange),
  Queue("autre", routing_key="autre", special_exchange),
}
CELERY_ROUTES = {
  'default': {
    'queue': 'default',
    'exchange': 'tasks',
    'exchange_type': 'direct',
    'routing_key': 'default',
  },
  'add': {
    'queue': 'add',
    'exchange': 'tasks',
    'exchange_type': 'topic',
    'routing_key': 'add',
  },
  'multi': {
    'queue': 'multi',
    'exchange': 'tasks',
    'exchange_type': 'topic',
    'routing_key': 'multi',
  },
  'autre': {
    'queue': 'autre',
    'exchange': 'tasks',
    'exchange_type': 'topic',
    'routing_key': 'autre',
  },
}
client.py
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 
#client.py
#C'est ici aussi que je galère.
#Es-ce avec Celery ?
 
#Dois-je importer Pika? ou Kombu?
 
#Si je fais:
from celery import Celery
#Si j'importe aussi les tasks
from tasks import add, multi, autre
 
#Les commandes possible?
#CMD 1
add.delay(1,3) 
#CMD 2
add.apply_async(
  args=[1,5],
  queue='add',
  routing_key='add',
)
#CMD 3
celery.send_task("tasks.add", [1, 7]
#Chaque fois que je réussis à faire communiquer les workers, j'obtiens toujours là réponse "Celery received 'topic' but current is 'direct' "
Je sais qu'ils manquent des éléments, mais je suis trop mélangé.

J'ai modifié les tâches seulement pour mieux comprendre. Donc, ce qui suit après est mon vrai problème. Mais un simple example
pour les échangess TOPIC serait apprécié.

J'ai deux ordinateurs, pour l'instant:
OrdinateurA = (2cores) Worker et Client ( je prend le 2cores en client puisque la moyenne des ordinateurs ont 2 cores..)
OrdinateurB = (8cores) Worker

J'aimerais simplement un petit example de tâche distribuer ( avoir les (2+8=(10) cores ensembles ) es-ce possible?
L'échange Direct, Fait travailler les workers, un à la fois.

Pour des testes, j'ai créer un fichier texte avec plus de 100,000,000 lignes ( pour ne pas faire des time.sleep(), pour faire semblant.. )

J'aimerais que tous les workers compte ensemble, le nombre de lignes...
Donc, aussi comment connaitre le nombre de cores totales, il y a entre les nœuds?

Ensuite, Seulement le worker client, divise le fichier en morceaux, selon le nombre de cores total trouvé.

Il pourra ensuite, soit retourner en listes chaque mot d'un morceau de fichier à la fois,
soit directement envoyer ( téléchargement ) le morceau désiré. si celery le permet.


Si c'est pas clair, posez ma des questions.
Quelqu'un pourrais m'aider?
Je trouve pas de tutorial avec Celery, sauf ça propre documentation.

Dois-je contourner Celery avec Pika ou Kombu?

Merci encore pour tout.
Merci de votre aide.