Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocessing not compatible with Flet #4283

Open
ap4499 opened this issue Oct 31, 2024 · 9 comments
Open

Multiprocessing not compatible with Flet #4283

ap4499 opened this issue Oct 31, 2024 · 9 comments
Assignees

Comments

@ap4499
Copy link

ap4499 commented Oct 31, 2024

Here is an example of the issue that I am finding with multiprocessing. The GUI effectively doubles when Multiprocessing is called. The reason for this is that Python's Multiprocessing calls sys.executable to obtain the Python interpreter to run the new processes/workers. (see comment)

When the Flet application is built, as discussed, sys.executable will by default point to the application executable. Hence, this behaviour will only be observed within applications that have been built using Flet Build, and not Flet Run.. further, PyInstaller seem to have fixed this - so it isn't present in Flet Pack.

Application:

  • Basic sort of the same list of numbers, but across multiple workers (processes).
  • Using a multiprocessing.Queue() to pass information.
  • Running the long running task using page.run_thread()
  • Updating the GUI only from the main thread.

Behaviour:

  • When running from terminal/ Flet Run the application behaves as expected.
  • When running the .app created with Flet Build (using your latest dev build from yday, dev3598), there are two behaviours:
  • As soon as the application is launched, another GUI window appears. I believe this is when we create the Multiprocessing.queue()
  • When the actual processing starts, a window is created for each worker that is used by multiprocessing.

#main.py

import flet as ft
from flet import Page, Text, ElevatedButton
from multiprocessing import Queue
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed


def sort_sublist(sublist):
    """Sorts a sublist of numbers."""
    return sorted(sublist)


def parallel_sort(queue_var):
    num_elements = 10_000_000  
    data = np.random.rand(num_elements).tolist()
    number_of_sorts = 20
    """Sorts a list of numbers in parallel."""

    with ProcessPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(sort_sublist, data) for _ in range(number_of_sorts)]  # Sort the same list x times
        for future in as_completed(futures):
            queue_var.put(1) 


def main(page: Page):
    page.title = "Flet Background Task Example"
    queue_var = Queue()
    text = ft.Text("Init")
    

    def update_text():
        completed = 0
        while True:
            completed += queue_var.get()
            text.value = completed
            text.update()

    def on_click(e):
        page.run_thread(parallel_sort,queue_var)
        update_text()


    page.add(
        ElevatedButton("Start Loop", on_click=on_click),
    )
    page.add(
        text
    )

if __name__ == "__main__":
    multiprocessing.freeze_support()
    ft.app(target=main)

#requirements.txt

flet==0.25.0.dev3598
numpy

Originally posted by @ap4499 in #4252 (comment)

@ap4499
Copy link
Author

ap4499 commented Oct 31, 2024

I think the reason that Multiprocessing is exhibiting the same issue as subprocess, is the one that you have identified already - the usage of sys.executable, and what that actually is for a Flet application.

multiprocessing.set_executable(executable)
Set the path of the Python interpreter to use when starting a child process. (By default sys.executable is used).

Embedders will probably need to do some thing like:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
https://docs.python.org/3/library/multiprocessing.html

@ap4499
Copy link
Author

ap4499 commented Oct 31, 2024

I suspect all applications that utilise multiprocessing will need the following line as the first line in the if name block.. but I'm unsure whether/if anything is required on the dev side

multiprocessing.freeze_support()

@FeodorFitsner FeodorFitsner self-assigned this Nov 1, 2024
@ap4499
Copy link
Author

ap4499 commented Nov 1, 2024

I am using the new dev build that you provided for subprocess, and can confirm that the sys.argv are coming through.

It seems that multiprocessing is indeed passing arguments using sys.argv, and all of them seem to at least contain "multiprocessing" (so I use an IN to filter an IF).

However, I have been unable to use this to get multiprocessing working in the way that I thought it should work.

In the below code, I have moved the import of Flet to be conditional, but even so - when the button is clicked, the GUI appears to duplicate.

Even if Multiprocessing was not getting what it needed, I would expect the behaviour in the below program to be null action. I suspect that multiprocessing may have a separate issue to the GUI duplication, as in the console.log file I note that it gives the following

./multiprocessing/resource_tracker.py:123: UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak.

But I am still puzzled why the GUI duplicates.

import logging    
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("flet_core")
logging.getLogger("flet")

import sys
import os    
argv = str(sys.argv )
logging.debug(f"sys.argv: {argv}")
orig_argv = str(sys.orig_argv)
logging.debug(f"sys.orig_argv: {orig_argv}")



from multiprocessing import Queue
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing


def sort_sublist(sublist):
    """Sorts a sublist of numbers."""
    return sorted(sublist)


def parallel_sort(queue_var):
    num_elements = 10_000_000  
    data = np.random.rand(num_elements).tolist()
    number_of_sorts = 20
    """Sorts a list of numbers in parallel."""

    with ProcessPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(sort_sublist, data) for _ in range(number_of_sorts)]  # Sort the same list x times
        for future in as_completed(futures):
            queue_var.put(1) 


def main(page):
    import flet as ft
    page.title = "Flet Background Task Example"
    queue_var = Queue()
    text = ft.Text("Init")
    

    def update_text():
        completed = 0
        while True:
            completed += queue_var.get()
            text.value = completed
            text.update()

    def on_click(e):
        page.run_thread(parallel_sort,queue_var)
        update_text()


    page.add(
        ft.ElevatedButton("Start Loop", on_click=on_click),
    )
    page.add(
        text
    )
    page.add(ft.Text(f"sys.argv: {str(sys.argv)}"))

if __name__ == "__main__":
  
    multiprocessing.freeze_support()

    if 'multiprocessing' in ' '.join(sys.argv):
        argv = str(sys.argv )
        logging.debug(f"Multiprocessing hit! sys.argv: {argv}")
        pass
    else:
        argv = str(sys.argv )
        logging.debug(f"LINE HIT! sys.argv: {argv}")
        import flet as ft


        ft.app(target=main)

Build commands used:

pip freeze > temp.txt
pip uninstall -r temp.txt -y 
pip install flet==0.25.0.dev3647

export PATH=$HOME/development/flutter/bin:$PATH

flet build macos --project "Demo" --product "Demo" --org "com.ABC" --company "Demo" --build-version "1.0.0" --template-ref 0.25.0-dev

@FeodorFitsner
Copy link
Contributor

I don't see multiprocessing in args. I get this when clicking "Start Loop" button:

myapp.app -OO -B -s -c from multiprocessing.resource_tracker import main;main(25)

@ap4499
Copy link
Author

ap4499 commented Nov 5, 2024

I get the same. The IF statement is intended to route any argument containing multiprocessing away from the launching the Flet app or any Flet imports.

In the above, when using sys.argv, any time that "multiprocessing" is found in the args, it routes it away from Flet. Multiprocessing seems to pass further arguments after resource tracker, once it fires off the processes - but all of the arguments it passes have commonality, in that it passes "from multiprocessing".

The below is an example of what I was intending. Whenever "Python" is found within the args (once all joined), we enter the IF. (ofc, using orig_argv instead - as by default we get an extra arg to demonstrate the join behaviour)

import sys

orig_argv = str(sys.orig_argv)
print("The original argv: ",orig_argv)

orig_argv_joined = ' '.join(sys.orig_argv)
print("The joined orig_argv: ",orig_argv_joined)


if 'Python' in orig_argv_joined:
    print('Python is in argv')
(.venv) alexproctor@Alexs-MacBook-Pro Multi_test % /Users/alexproctor/Documents/GitHub/Multi_test/.venv/bin/python /Users/alexproctor/Documents/GitHub/Multi_test/fttest.py
The original argv:  ['/Library/Frameworks/Python.framework/Versions/3.12/Resources/Python.app/Contents/MacOS/Python', '/Users/alexproctor/Documents/GitHub/Multi_test/fttest.py']
The joined orig_argv:  /Library/Frameworks/Python.framework/Versions/3.12/Resources/Python.app/Contents/MacOS/Python /Users/alexproctor/Documents/GitHub/Multi_test/fttest.py
Python is in argv

@FeodorFitsner
Copy link
Contributor

OK, I've played a bit, managed to fix some issues in Flet build template and got multiprocessing "partially" working. Here is my last example:

import logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("flet_core")
logging.getLogger("flet")

import os
import sys

argv = str(sys.argv)
logging.debug(f"sys.argv: {argv}")
orig_argv = str(sys.orig_argv)
logging.debug(f"sys.orig_argv: {orig_argv}")
logging.debug(f"env vars: {os.environ}")

import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

import numpy as np


def sort_sublist(sublist):
    """Sorts a sublist of numbers."""
    return sorted(sublist)


def parallel_sort(queue_var):
    num_elements = 10_000_000
    data = np.random.rand(num_elements).tolist()
    number_of_sorts = 20
    """Sorts a list of numbers in parallel."""

    with ProcessPoolExecutor(max_workers=1) as executor:
        futures = [
            executor.submit(sort_sublist, data) for _ in range(number_of_sorts)
        ]  # Sort the same list x times
        for future in as_completed(futures):
            queue_var.put(1)


def main(page):
    import flet as ft

    page.title = "Flet Background Task Example"
    queue_var = Queue()
    text = ft.Text("Init")

    def update_text():
        completed = 0
        while True:
            completed += queue_var.get()
            text.value = completed
            text.update()

    def on_click(e):
        page.run_thread(parallel_sort, queue_var)
        update_text()

    def window_event(e):
        print(e)
        if e.data == "close":
            sys.exit(0)
            page.window.destroy()

    page.window.prevent_close = True
    page.window.on_event = window_event

    page.add(
        ft.ElevatedButton("Start Loop", on_click=on_click),
    )
    page.add(
        ft.ElevatedButton("Quit app", on_click=lambda _: sys.exit(0)),
    )
    page.add(text)
    page.add(ft.Text(f"sys.argv: {str(sys.argv)}"))
    page.add(ft.Text(os.getenv("FLET_HIDE_WINDOW_ON_START")))


if __name__ == "__main__":

    multiprocessing.freeze_support()

    c_arg = "-c"
    c_arg_provided = False
    if c_arg in sys.argv:
        c_arg_idx = sys.argv.index(c_arg)
        if c_arg_idx < len(sys.argv) - 1:
            c_arg_provided = True
            exec(sys.argv[c_arg_idx + 1])

    if not c_arg_provided:
        os.environ["FLET_HIDE_APP_ON_START"] = "true"
        import flet as ft

        ft.app(target=main)

So, some improvements:

  1. Setting FLET_HIDE_APP_ON_START environment variable forces all child processes to open with a hidden window - no more GUI in workers.
  2. Icon is hidden on macOS dock though there is a "blink" as it's getting hidden programmatically from the code.

-c from multiprocessing... is a command it passes to a child python interpreter hence exec() in my code.

What I can't understand is why it's holding child processes hanging after parallel_sort() call.
If you run the program you'll see that there are two processes with the same name. If you close the window or click "Quit app" it will terminate both the main process and a child process.

However, after clicking "Start Loop" the 3rd process is started (I limited to 1 worker) and then neither closing window nor clicking "Quit app" don't terminate child processes.

Looking at multiprocessing module sources I understand it communicates with child worker processes via pipes (so it passes pipe name/number in from multiprocessing.resource_tracker import main;main(25)).

How to drop child processes? Should you call some dispose/cleanup/release login in your app?

Hope that helps.

@ap4499
Copy link
Author

ap4499 commented Nov 6, 2024

I've tried the above code on both Windows and Mac, and when the app is built, it shows a blank screen on both (even after the initialisation).

The below is my Windows command, in which I clear the cache (I like the proposed change btw), and then build it using the latest Flet.

Remove-Item -Path "build" -Recurse -Force -ErrorAction SilentlyContinue

pip freeze > temp.txt
pip uninstall -r temp.txt -y 
pip install flet==0.25.0.dev3673 flet-cli flet-core flet-desktop --no-cache-dir

flet build windows --project "Demo" --product "Demo" --org "com.ABC" --company "Demo" --build-version "1.0.0" --template-ref 0.25.0-dev

requirements.txt

flet==0.25.0.dev3673
flet-core
flet-desktop
numpy
pandas

In the below code, I have only changed two lines for the purposes of debugging, and I am finding that the application, when run using flet run main.py, the application cannot be exited, in the same way that occurs in your above example. (it doesnt run using Flet build).

import logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("flet_core")
logging.getLogger("flet")

import os
import sys
argv = str(sys.argv)
logging.debug(f"sys.argv: {argv}")
orig_argv = str(sys.orig_argv)
logging.debug(f"sys.orig_argv: {orig_argv}")
logging.debug(f"env vars: {os.environ}")

import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

import numpy as np

#################CHANGE
import flet as ft


def sort_sublist(sublist):
    """Sorts a sublist of numbers."""
    return sorted(sublist)


def parallel_sort(queue_var):
    num_elements = 10_000_000
    data = np.random.rand(num_elements).tolist()
    number_of_sorts = 20
    """Sorts a list of numbers in parallel."""

    with ProcessPoolExecutor(max_workers=1) as executor:
        futures = [
            executor.submit(sort_sublist, data) for _ in range(number_of_sorts)
        ]  # Sort the same list x times
        for future in as_completed(futures):
            queue_var.put(1)


def main(page):
    import flet as ft

    page.title = "Flet Background Task Example"
    queue_var = Queue()
    text = ft.Text("Init")

    def update_text():
        completed = 0
        while True:
            completed += queue_var.get()
            text.value = completed
            text.update()

    def on_click(e):
        page.run_thread(parallel_sort, queue_var)
        update_text()

    def window_event(e):
        print(e)
        if e.data == "close":
            sys.exit(0)
            page.window.destroy()

    page.window.prevent_close = True
    page.window.on_event = window_event

    page.add(
        ft.ElevatedButton("Start Loop", on_click=on_click),
    )
    page.add(
        ft.ElevatedButton("Quit app", on_click=lambda _: sys.exit(0)),
    )
    page.add(text)
    page.add(ft.Text(f"sys.argv: {str(sys.argv)}"))
    page.add(ft.Text(os.getenv("FLET_HIDE_WINDOW_ON_START")))


if __name__ == "__main__":
    #################CHANGE
    ft.app(target=main)

    # multiprocessing.freeze_support()

    # c_arg = "-c"
    # c_arg_provided = False
    # if c_arg in sys.argv:
    #     c_arg_idx = sys.argv.index(c_arg)
    #     if c_arg_idx < len(sys.argv) - 1:
    #         c_arg_provided = True
    #         exec(sys.argv[c_arg_idx + 1])

    # if not c_arg_provided:
    #     os.environ["FLET_HIDE_APP_ON_START"] = "true"
    #     import flet as ft

    #     ft.app(target=main)

Terminal output when clicking the "X" in the Windows native toolbar.

DEBUG:flet:_on_message: {"action":"updateControlProps","payload":{"props":[{"i":"page","windowwidth":"1280.0","windowheight":"720.0","windowtop":"9.777777777777779","windowleft":"9.777777777777779","windowminimized":"false","windowmaximized":"false","windowfocused":"true","windowfullscreen":"false"}]}}
DEBUG:flet:page.on_event_async: page change [{"i":"page","windowwidth":"1280.0","windowheight":"720.0","windowtop":"9.777777777777779","windowleft":"9.777777777777779","windowminimized":"false","windowmaximized":"false","windowfocused":"true","windowfullscreen":"false"}]
DEBUG:flet:_on_message: {"action":"pageEventFromWeb","payload":{"eventTarget":"page","eventName":"window_event","eventData":"close"}}
DEBUG:flet:page.on_event_async: page window_event close
WindowEvent(name='window_event', type=<WindowEventType.CLOSE: 'close'>, data='close')
ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=SystemExit(0)>
Traceback (most recent call last):
  File "c:\Users\ampro\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python\Imaging_proj\Flet\.venv\Lib\site-packages\flet\core\page.py", line 944, in wrapper
    handler(*args)
  File "c:\Python\Imaging_proj\Flet\main.py", line 63, in window_event
    sys.exit(0)
SystemExit: 0

@ap4499
Copy link
Author

ap4499 commented Nov 25, 2024

I am still encountering an issue when the application is built - I cant get it to launch the multiprocess workers.

The above issue where the window would not launch has been resolved by one of the intermediate Flet versions though.

Observed behavior:

Using Flet Run:

  • The multiprocessing works, and we get the outputted CSV files. I added the CSV step as I noticed on another debug run (when using Build) that the counter increased, but the CPU load was not sufficient to actually be sorting the numbers. The CSV output demonstrated that the theory was correct (ie. no files written). There is still the bug that you note, where the process can't be exited.. but this is less problematic for me, as it doesn't appear to be present within Flet Build and the final executable.

Flet Build:

  • When creating the final executable, the sort does not occur and no CSV files, nor the GUI, are produced/updated.

Workaround considered:

  • I have a development deadline imminent, and I am considering wrapping my multiprocessing code in a cx_freeze executable, which is piped and called from the Flet application.

Question:

  • Is Multiprocessing support likely to make it into Flet0.25? I'm hesitant to create the workaround, it'll be cumbersome to maintain, and ultimately, I'll have to reverse it once multiprocessing is up and running.

Thanks for the support on this - appreciate all the effort going into this on your side.

import logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("flet_core").setLevel(logging.WARNING) 
logging.getLogger("flet").setLevel(logging.WARNING) 



import os
import sys

argv = str(sys.argv)
logging.debug(f"sys.argv: {argv}")
orig_argv = str(sys.orig_argv)
logging.debug(f"sys.orig_argv: {orig_argv}")
logging.debug(f"env vars: {os.environ}")

import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

import numpy as np
import csv
import os

def sort_sublist(sublist, sort_number): 
##### ! edit output folder ! ####
    output_folder = "/Users/alexproctor/Documents/CSV_OUTPUT" 
##### ! edit output folder ! ####

    """Sorts a sublist of numbers and saves it to a CSV file in the specified folder."""
    sorted_sublist = sorted(sublist)
    filename = os.path.join(output_folder, f"sorted_list_{sort_number}.csv")
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(sorted_sublist) 
    return filename

def parallel_sort(queue_var):
    num_elements = 10_000_000
    data = np.random.rand(num_elements).tolist()
    number_of_sorts = 100

    with ProcessPoolExecutor(max_workers=1) as executor:
        futures = [
            executor.submit(sort_sublist, data, i) for i in range(number_of_sorts)
        ]  
        for future in as_completed(futures):
            filename = future.result()
            print(f"Sorted list saved to {filename}")
            queue_var.put(1)


def main(page):
    import flet as ft

    page.title = "Flet Background Task Example"
    queue_var = Queue()
    text = ft.Text("Init")

    def update_text():
        completed = 0
        while True:
            completed += queue_var.get()
            text.value = completed
            text.update()

    def on_click(e):
        page.run_thread(parallel_sort, queue_var)
        update_text()

    def window_event(e):
        print(e)
        if e.data == "close":
            sys.exit(0)
            page.window.destroy()

    page.window.prevent_close = True
    page.window.on_event = window_event

    page.add(
        ft.ElevatedButton("Start Loop", on_click=on_click),
    )
    page.add(
        ft.ElevatedButton("Quit app", on_click=lambda _: sys.exit(0)),
    )
    page.add(text)
    page.add(ft.Text(f"sys.argv: {str(sys.argv)}"))
    page.add(ft.Text(os.getenv("FLET_HIDE_WINDOW_ON_START")))


if __name__ == "__main__":

    multiprocessing.freeze_support()
    logging.debug("sys.argv: ",sys.argv)

    c_arg = "-c"
    c_arg_provided = False
    if c_arg in sys.argv:
        c_arg_idx = sys.argv.index(c_arg)
        if c_arg_idx < len(sys.argv) - 1:
            c_arg_provided = True
            exec(sys.argv[c_arg_idx + 1])
            logging.debug("sys.argv[c_arg_idx + 1]: ",sys.argv[c_arg_idx + 1])

    if not c_arg_provided:
        os.environ["FLET_HIDE_APP_ON_START"] = "true"
        import flet as ft

        ft.app(target=main)

@FeodorFitsner
Copy link
Contributor

Answering your question: the most of the work to support multiprocessing is done in flet build template project, not Flet package itself, so we could further look into that once 0.25 is released. It's independent work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants