-
Notifications
You must be signed in to change notification settings - Fork 335
feat(electric): Add a new write-to-pg mode that applies changes directly to Postgres #698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
621f2f7
Introduce the Postgres.Writer module for streaming client writes to PG
alco f818bac
Introduce the ability to switch between streaming and immediate write…
alco 0aa5a0b
Update acknowledge_client_lsns directly in the immediate write mode
alco 8dfdc04
Use a custom setting to control which triggers are invoked for Electr…
alco 62ed179
Make E2E test work in the immediate write mode
alco 679a2f9
Run E2E tests for both write modes on CI
alco ed6ed2c
Rename ELECTRIC_WRITE_MODE to ELECTRIC_WRITE_TO_PG_MODE
alco fd1b984
Only start Replication.Postgres.TcpServer in the logical_replication …
alco d66fa84
Log connection settings before attempting to establish a replication …
alco 015edf0
Replace GenStage.async_subscribe() with [subscribe_to: ...]
alco c6b2c2f
Use Enum.map_join
alco 7a5bf8f
Explain the nature and purpose of the custom electric.__session_repli…
alco 93b2ba2
Remove superfluous WHERE clause
alco 9b9fd22
fixup! Use a custom setting to control which triggers are invoked for…
alco edd6d33
Add an internal migration to convert existing replica triggers to ENA…
alco 7372694
Stop PG subscription in case one exists and is active
alco d09c8bc
Remove unused "prisma_support" test tag
alco a922ec3
Make Electric.Extension.Case more flexible
alco 8e81a31
Clean up the trigger conversion migration code
alco c950eb1
Replace "electric" with schema in the trigger conversion migration
alco 5f4c21a
Move the definition of electric.install_functions_and_triggers() to r…
alco 9a5aff6
fixup! Stop PG subscription in case one exists and is active
alco File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 19 additions & 0 deletions
19
...nts/electric/lib/electric/postgres/extension/functions/__session_replication_role.sql.eex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| -- This function augments the builtin [`session_replication_role`][1] setting to emulate its behaviour for the direct_writes mode. | ||
| -- | ||
| -- With ELECTRIC_WRITE_TO_PG_MODE is set to `logical_replication`, the value of the builtin setting is looked up. On the | ||
| -- Electric->PG replication connection its value is `replica`, whereas on regular connections to Postgres it is | ||
| -- `origin`. | ||
| -- | ||
| -- When ELECTRIC_WRITE_TO_PG_MODE is set to `direct_writes`, the value of the [custom `electric.session_replication_role` option][2] | ||
| -- is looked up. It is used to emulate the same Electric->PG trigger activation behaviour even though a regular connection is | ||
| -- used to apply writes to Postgres in this write mode. | ||
| -- | ||
| -- [1]: https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SESSION-REPLICATION-ROLE | ||
| -- [2]: https://www.postgresql.org/docs/14/runtime-config-custom.html | ||
| CREATE OR REPLACE FUNCTION <%= @schema %>.__session_replication_role(OUT role text) AS $$ | ||
| BEGIN | ||
| SELECT INTO role current_setting('electric.session_replication_role'); | ||
| EXCEPTION WHEN undefined_object THEN | ||
| SELECT INTO role current_setting('session_replication_role'); | ||
| END; | ||
| $$ LANGUAGE PLPGSQL; | ||
84 changes: 84 additions & 0 deletions
84
...electric/lib/electric/postgres/extension/functions/install_functions_and_triggers.sql.eex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| CREATE OR REPLACE FUNCTION <%= @schema %>.install_functions_and_triggers(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[]) | ||
| RETURNS VOID | ||
| LANGUAGE PLPGSQL | ||
| AS $function$ | ||
| DECLARE | ||
| shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name; | ||
| full_table_identifier TEXT := format('%I.%I', schema_name, table_name); | ||
| generated_functions JSONB; | ||
| BEGIN | ||
| -- Install function to be used in the triggers | ||
| generated_functions := <%= @schema %>.install_conflict_resolution_functions(schema_name, table_name, primary_key_list, non_pk_column_list); | ||
|
|
||
| -- Install actual triggers | ||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER as_first__save_deleted_rows_to_tombstone_table | ||
| AFTER DELETE ON %s | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE <%= @schema %>.%I() | ||
| $$, full_table_identifier, generated_functions->>'generate_tombstone_entry'); | ||
| EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER as_first__save_deleted_rows_to_tombstone_table $$, full_table_identifier); | ||
|
|
||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows | ||
| BEFORE INSERT OR UPDATE ON %s | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() <> 'replica') | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier); | ||
|
|
||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows | ||
| BEFORE DELETE ON %s | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() <> 'replica') | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier); | ||
|
|
||
| EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON <%= @schema %>.%I $$, shadow_table_name); | ||
| EXECUTE format($$ | ||
| CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags | ||
| AFTER UPDATE ON <%= @schema %>.%I | ||
| DEFERRABLE INITIALLY DEFERRED | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() <> 'replica' AND NOT NEW._resolved) | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, shadow_table_name, generated_functions->>'write_correct_max_tag'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name); | ||
|
|
||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER satellite_write__upsert_rows | ||
| BEFORE INSERT ON <%= @schema %>.%I | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL) | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__upsert_rows $$, shadow_table_name); | ||
|
|
||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags | ||
| BEFORE UPDATE ON <%= @schema %>.%I | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND NEW._currently_reordering IS NULL) | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, shadow_table_name, generated_functions->>'resolve_observed_tags'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name); | ||
|
|
||
| EXECUTE format($$ | ||
| CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering | ||
| BEFORE INSERT OR UPDATE ON %s | ||
| FOR EACH ROW | ||
| WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1) | ||
| EXECUTE PROCEDURE <%= @schema %>.%I(); | ||
| $$, full_table_identifier, generated_functions->>'reorder_main_op'); | ||
|
|
||
| EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier); | ||
| END | ||
| $function$; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 31 additions & 0 deletions
31
...ectric/postgres/extension/migrations/20231206130400_convert_replica_triggers_to_always.ex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| defmodule Electric.Postgres.Extension.Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways do | ||
| alias Electric.Postgres.Extension | ||
|
|
||
| @behaviour Extension.Migration | ||
|
|
||
| sql_file = | ||
| Path.expand( | ||
| "20231206130400_convert_replica_triggers_to_always/replace_replica_triggers.sql", | ||
| __DIR__ | ||
| ) | ||
|
|
||
| @external_resource sql_file | ||
|
|
||
| @migration_sql File.read!(sql_file) | ||
|
|
||
| @impl true | ||
| def version, do: 2023_12_06_13_04_00 | ||
|
|
||
| @impl true | ||
| def up(schema) do | ||
| [ | ||
| Extension.Functions.by_name(:__session_replication_role), | ||
| String.replace(@migration_sql, "electric", schema) | ||
| ] | ||
| end | ||
|
|
||
| @impl true | ||
| def down(_schema) do | ||
| [] | ||
| end | ||
| end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.