Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions go-manual/modules/ROOT/pages/concurrency.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/neo4j/neo4j-go-driver/v6/neo4j"
)

type item struct {
record *neo4j.Record
err error
}

func main() {
ctx := context.Background()

Expand Down Expand Up @@ -65,58 +70,74 @@ func main() {
}
}

func queryToChannel(ctx context.Context, driver neo4j.Driver) chan *neo4j.Record {
recordsC := make(chan *neo4j.Record, 10) // <2>
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
go session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
// Neo4j query to create and retrieve some nodes
result, err := tx.Run(ctx, `
UNWIND range(1,25) AS id
MERGE (p:Person {id: id})
RETURN p
`, nil)
if err != nil {
panic(err)
}
func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan item {
recordsC := make(chan item, 10) // <2>
go func() {
defer close(recordsC)
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "<database-name>"})
defer session.Close(ctx)
// Cypher query to create and retrieve some nodes
result, err := session.Run(ctx, `
UNWIND range(1,25) AS id
MERGE (p:Person {id: id})
RETURN p
`, nil)
if err != nil {
var it item
it.record, it.err = nil, err
recordsC <- it
} else {
// Stream results to channel as they come from the server
for result.Next(ctx) { // <3>
for result.Next(ctx) { // <3>
record := result.Record()
recordsC <- record
var it item
it.record, it.err = record, nil
recordsC <- it
}
close(recordsC)
return nil, err
})
// If the stream interrupts, send the error through the channel
if result.Err() != nil {
var it item
it.record, it.err = nil, err
recordsC <- it
}
}
}()
return recordsC
}

func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
func consumer(wg *sync.WaitGroup, records <-chan item, log chan string, n int) {
defer wg.Done() // will communicate that routine is done
for record := range records {
log <- fmt.Sprintf("Receiver %v processed %v", n, record)
time.Sleep(time.Duration(n) * time.Second) // proxy for a time-consuming processing
for it := range records {
if it.err != nil {
fmt.Println("ERROR:", it.err)
} else {
log <- fmt.Sprintf("Receiver %v processed %v", n, it.record)
time.Sleep(time.Duration(n) * time.Second) // proxy for a time-consuming processing
}
}
}
----

<1> A Goroutine runs the query to the Neo4j server with a xref:transactions.adoc#managed-transactions[managed transaction].
Notice that the driver session is created _inside_ the routine, as sessions are not thread-safe.
<2> The channel `recordsC` is where the query result records get streamed to.
The transaction function from `.ExecuteWrite()` writes to it, and the various ``consumer``s read from it.
<1> A Goroutine runs the query to the Neo4j server with an xref:query-advanced.adoc#implicit-transactions[implicit transaction].
Notice that the driver xref:transactions.adoc#create-session[session] is created _inside_ the routine, as sessions are not thread-safe.
<2> The channel `recordsC` is where the query result records get streamed to, and the various ``consumer``s read from it.
It is buffered so that the driver does not retrieve records faster than what the consumers can handle.
<3> Each result record coming from the server is sent over the `recordsC` channel.
The streaming continues so long as there are records to be processed, after which the channel gets closed and the routine exits.
Note that an error might occur while streaming, and that must also be sent over the channel.
<4> The channel `log` is where the consumers comunicate on.
<5> A `sync.WaitGroup` is needed to know when all consumers are done, and thus the `log` channel can be closed.
<6> A number of ``consumer``s get started in separate Goroutines.
Each consumer reads and processes records from the `recordsC` channel.
Each consumer simulates a lengthy operation with a sleeping timer.

[IMPORTANT]
streaming, idemptency


== Concurrent run of multiple queries (using `ExecuteQuery()`)

The following example shows how you can run multiple queries concurrently.
The following example shows how you can execute multiple `ExecuteQuery()` calls concurrently.

[source, go]
----
Expand Down
1 change: 1 addition & 0 deletions go-manual/modules/ROOT/pages/transactions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ For these more advanced use-cases, the driver provides functions to manually con
The most common form is _managed transactions_, and you can think of them as a way of unwrapping the flow of `ExecutableQuery()` and being able to specify its desired behavior in more places.


[#create-session]
== Create a session

Before running a transaction, you need to obtain a _session_.
Expand Down