Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class PythonProcess {
private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap<>();
private final Set<CreatedProcessor> createdProcessors = new CopyOnWriteArraySet<>();
private volatile boolean shutdown = false;
private volatile boolean ready = false;
private volatile boolean startFailed = false;
private volatile List<String> extensionDirs;
private volatile String workDir;
private Thread logReaderThread;
Expand All @@ -99,79 +101,132 @@ PythonController getCurrentController() {
return controller;
}

public synchronized void start() throws IOException {
final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
final SocketFactory socketFactory = SocketFactory.getDefault();

final int timeoutMillis = (int) processConfig.getCommsTimeout().toMillis();
final String authToken = generateAuthToken();
final CallbackClient callbackClient = new CallbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), authToken,
50000L, TimeUnit.MILLISECONDS, socketFactory, false, timeoutMillis);

final JavaObjectBindings bindings = new JavaObjectBindings();
gateway = new NiFiPythonGateway(bindings, null, callbackClient);
gateway.startup();

server = new NiFiGatewayServer(gateway,
0,
GatewayServer.defaultAddress(),
timeoutMillis,
timeoutMillis,
Collections.emptyList(),
serverSocketFactory,
authToken,
componentType,
componentId);
server.start();

final int listeningPort = server.getListeningPort();

setupEnvironment();
this.process = launchPythonProcess(listeningPort, authToken);
this.process.onExit().thenAccept(this::handlePythonProcessDied);

final String logReaderThreadName = LOG_READER_THREAD_NAME_FORMAT.formatted(process.pid());
final Runnable logReaderCommand = new PythonProcessLogReader(process.inputReader(StandardCharsets.UTF_8));
this.logReaderThread = Thread.ofVirtual().name(logReaderThreadName).start(logReaderCommand);

final StandardPythonClient pythonClient = new StandardPythonClient(gateway);
controller = pythonClient.getController();

final long timeout = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
Exception lastException = null;
boolean pingSuccessful = false;
while (System.currentTimeMillis() < timeout) {
try {
final String pingResponse = controller.ping();
pingSuccessful = "pong".equals(pingResponse);
/**
* Returns true if this process has completed start() successfully and is ready to create processors.
* Used by StandardPythonBridge so other threads can skip not-yet-ready processes or wait for them.
*/
public boolean isReady() {
return ready;
}

if (pingSuccessful) {
break;
} else {
logger.debug("Got unexpected response from Py4J Server during ping: {}", pingResponse);
}
} catch (final Exception e) {
lastException = e;
logger.debug("Failed to start Py4J Server", e);
/**
* Blocks until this process is ready (start() completed successfully) or the process is shut down.
* Callers must invoke this outside any bridge lock to avoid deadlock.
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws IOException if the process was shut down before becoming ready
*/
public void waitUntilReady() throws InterruptedException, IOException {
synchronized (this) {
while (!ready && !shutdown && !startFailed) {
wait();
}

try {
Thread.sleep(50L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
if (shutdown && !ready) {
throw new IOException("Python Process [%s] was shut down before becoming ready".formatted(componentId));
}
if (startFailed && !ready) {
throw new IOException("Python Process [%s] failed to start".formatted(componentId));
}
}
}

if (!pingSuccessful && lastException != null) {
throw new RuntimeException("Failed to start Python Bridge", lastException);
void markReadyAndNotify() {
synchronized (this) {
ready = true;
notifyAll();
}
}

logListenerId = Long.toString(process.pid());
StandardLogLevelChangeHandler.getHandler().addListener(logListenerId, new PythonProcessLogLevelChangeListener());
private void notifyWaiters() {
synchronized (this) {
notifyAll();
}
}

controller.setControllerServiceTypeLookup(controllerServiceTypeLookup);
logger.info("Successfully started and pinged Python Server. Python Process = {}", process);
public synchronized void start() throws IOException {
ready = false;
startFailed = false;
try {
final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
final SocketFactory socketFactory = SocketFactory.getDefault();

final int timeoutMillis = (int) processConfig.getCommsTimeout().toMillis();
final String authToken = generateAuthToken();
final CallbackClient callbackClient = new CallbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), authToken,
50000L, TimeUnit.MILLISECONDS, socketFactory, false, timeoutMillis);

final JavaObjectBindings bindings = new JavaObjectBindings();
gateway = new NiFiPythonGateway(bindings, null, callbackClient);
gateway.startup();

server = new NiFiGatewayServer(gateway,
0,
GatewayServer.defaultAddress(),
timeoutMillis,
timeoutMillis,
Collections.emptyList(),
serverSocketFactory,
authToken,
componentType,
componentId);
server.start();

final int listeningPort = server.getListeningPort();

setupEnvironment();
this.process = launchPythonProcess(listeningPort, authToken);
this.process.onExit().thenAccept(this::handlePythonProcessDied);

final String logReaderThreadName = LOG_READER_THREAD_NAME_FORMAT.formatted(process.pid());
final Runnable logReaderCommand = new PythonProcessLogReader(process.inputReader(StandardCharsets.UTF_8));
this.logReaderThread = Thread.ofVirtual().name(logReaderThreadName).start(logReaderCommand);

final StandardPythonClient pythonClient = new StandardPythonClient(gateway);
controller = pythonClient.getController();

final long timeout = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
Exception lastException = null;
boolean pingSuccessful = false;
while (System.currentTimeMillis() < timeout) {
try {
final String pingResponse = controller.ping();
pingSuccessful = "pong".equals(pingResponse);

if (pingSuccessful) {
break;
} else {
logger.debug("Got unexpected response from Py4J Server during ping: {}", pingResponse);
}
} catch (final Exception e) {
lastException = e;
logger.debug("Failed to start Py4J Server", e);
}

try {
Thread.sleep(50L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
startFailed = true;
notifyWaiters();
return;
}
}

if (!pingSuccessful && lastException != null) {
throw new RuntimeException("Failed to start Python Bridge", lastException);
}

logListenerId = Long.toString(process.pid());
StandardLogLevelChangeHandler.getHandler().addListener(logListenerId, new PythonProcessLogLevelChangeListener());

controller.setControllerServiceTypeLookup(controllerServiceTypeLookup);
logger.info("Successfully started and pinged Python Server. Python Process = {}", process);
} catch (final Throwable t) {
startFailed = true;
notifyWaiters();
throw t;
} finally {
notifyWaiters();
}
}

private void handlePythonProcessDied(final Process process) {
Expand All @@ -198,6 +253,7 @@ private void handlePythonProcessDied(final Process process) {
// Ensure that we re-discover any extensions, as this is necessary in order to create Processors
if (extensionDirs != null && workDir != null) {
discoverExtensions(extensionDirs, workDir);
markReadyAndNotify();
recreateProcessors();
}

Expand Down Expand Up @@ -423,6 +479,7 @@ public boolean isShutdown() {

public void shutdown() {
shutdown = true;
notifyWaiters();
logger.info("Shutting down Python Process {}", process);
killProcess();
}
Expand Down
Loading
Loading