Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
621f2f7
Introduce the Postgres.Writer module for streaming client writes to PG
alco Nov 21, 2023
f818bac
Introduce the ability to switch between streaming and immediate write…
alco Oct 24, 2023
0aa5a0b
Update acknowledge_client_lsns directly in the immediate write mode
alco Nov 24, 2023
8dfdc04
Use a custom setting to control which triggers are invoked for Electr…
alco Nov 22, 2023
62ed179
Make E2E test work in the immediate write mode
alco Nov 28, 2023
679a2f9
Run E2E tests for both write modes on CI
alco Nov 28, 2023
ed6ed2c
Rename ELECTRIC_WRITE_MODE to ELECTRIC_WRITE_TO_PG_MODE
alco Nov 28, 2023
fd1b984
Only start Replication.Postgres.TcpServer in the logical_replication …
alco Nov 29, 2023
d66fa84
Log connection settings before attempting to establish a replication …
alco Nov 29, 2023
015edf0
Replace GenStage.async_subscribe() with [subscribe_to: ...]
alco Dec 5, 2023
c6b2c2f
Use Enum.map_join
alco Dec 5, 2023
7a5bf8f
Explain the nature and purpose of the custom electric.__session_repli…
alco Dec 5, 2023
93b2ba2
Remove superfluous WHERE clause
alco Dec 5, 2023
9b9fd22
fixup! Use a custom setting to control which triggers are invoked for…
alco Dec 6, 2023
edd6d33
Add an internal migration to convert existing replica triggers to ENA…
alco Dec 6, 2023
7372694
Stop PG subscription in case one exists and is active
alco Dec 6, 2023
d09c8bc
Remove unused "prisma_support" test tag
alco Dec 6, 2023
a922ec3
Make Electric.Extension.Case more flexible
alco Dec 6, 2023
8e81a31
Clean up the trigger conversion migration code
alco Dec 6, 2023
c950eb1
Replace "electric" with schema in the trigger conversion migration
alco Dec 6, 2023
5f4c21a
Move the definition of electric.install_functions_and_triggers() to r…
alco Dec 6, 2023
9a5aff6
fixup! Stop PG subscription in case one exists and is active
alco Dec 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ on:
pull_request:
paths-ignore:
# Root files
- '*'
- '!pnpm-lock.yaml'
- "*"
- "!pnpm-lock.yaml"
# CI files not related to GH actions
- '.buildkite/**'
- '**/README.md'
- 'docs/**'
- ".buildkite/**"
- "**/README.md"
- "docs/**"

env:
OTP_VERSION: "25.3"
Expand All @@ -25,11 +25,15 @@ jobs:
e2e_tests:
name: E2E tests
runs-on: electric-e2e-8-32
strategy:
matrix:
write_to_pg_mode: [logical_replication, direct_writes]
defaults:
run:
working-directory: e2e
env:
BUILDKITE_ANALYTICS_TOKEN: ${{ secrets.BUILDKITE_TEST_ANALYTICS_E2E }}
ELECTRIC_WRITE_TO_PG_MODE: ${{ matrix.write_to_pg_mode }}
steps:
- uses: actions/checkout@v3
with:
Expand Down
16 changes: 13 additions & 3 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ default_pg_proxy_port = "65432"
default_listen_on_ipv6 = "false"
default_database_require_ssl = "false"
default_database_use_ipv6 = "false"
default_write_to_pg_mode = "logical_replication"

###

Expand Down Expand Up @@ -78,12 +79,19 @@ config :logger, :console,
pg_server_port = get_env_int.("LOGICAL_PUBLISHER_PORT", default_pg_server_port)
listen_on_ipv6? = get_env_bool.("ELECTRIC_USE_IPV6", default_listen_on_ipv6)

write_to_pg_mode =
case System.get_env("ELECTRIC_WRITE_TO_PG_MODE", default_write_to_pg_mode) do
"logical_replication" -> :logical_replication
"direct_writes" -> :direct_writes
end

config :electric,
# Used in telemetry, and to identify the server to the client
instance_id: System.get_env("ELECTRIC_INSTANCE_ID", Electric.Utils.uuid4()),
http_port: get_env_int.("HTTP_PORT", default_http_server_port),
pg_server_port: pg_server_port,
listen_on_ipv6?: listen_on_ipv6?
listen_on_ipv6?: listen_on_ipv6?,
write_to_pg_mode: write_to_pg_mode

config :electric, Electric.Replication.Postgres,
pg_client: Electric.Replication.Postgres.Client,
Expand Down Expand Up @@ -127,8 +135,10 @@ if config_env() == :prod do
|> Keyword.put(:replication, "database")

pg_server_host =
System.get_env("LOGICAL_PUBLISHER_HOST") ||
raise("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
if write_to_pg_mode == :logical_replication do
System.get_env("LOGICAL_PUBLISHER_HOST") ||
raise("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
end

proxy_port = get_env_int.("PG_PROXY_PORT", default_pg_proxy_port)

Expand Down
4 changes: 4 additions & 0 deletions components/electric/lib/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric do
@moduledoc false

@type reg_name :: {:via, :gproc, {:n, :l, term()}}
@type write_to_pg_mode :: :logical_replication | :direct_writes

@doc """
Register process with the given name
Expand Down Expand Up @@ -99,4 +100,7 @@ defmodule Electric do
def vsn do
@current_vsn
end

@spec write_to_pg_mode :: write_to_pg_mode
def write_to_pg_mode, do: Application.fetch_env!(:electric, :write_to_pg_mode)
end
21 changes: 16 additions & 5 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ defmodule Electric.Application do
Electric.Postgres.Proxy.SASL.SCRAMLockedCache,
Electric.Satellite.SubscriptionManager,
Electric.Satellite.ClientManager,
Electric.Replication.Connectors,
{ThousandIsland,
[port: pg_server_port(), handler_module: Electric.Replication.Postgres.TcpServer] ++
listener_opts()}
Electric.Replication.Connectors
]

children =
children ++
if Electric.write_to_pg_mode() == :logical_replication do
[
{ThousandIsland,
[port: pg_server_port(), handler_module: Electric.Replication.Postgres.TcpServer] ++
listener_opts()}
]
else
[]
end

children =
children ++
unless Application.get_env(:electric, :disable_listeners, false) do
Expand All @@ -41,7 +50,9 @@ defmodule Electric.Application do
|> Enum.each(fn {name, config} ->
Connectors.start_connector(
PostgresConnector,
Keyword.put(config, :origin, to_string(name))
config
|> Keyword.put(:origin, to_string(name))
|> Keyword.put(:write_to_pg_mode, Electric.write_to_pg_mode())
)
end)

Expand Down
3 changes: 2 additions & 1 deletion components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ defmodule Electric.Postgres.Extension do
Migrations.Migration_20230921161418_ProxyCompatibility,
Migrations.Migration_20231009121515_AllowLargeMigrations,
Migrations.Migration_20231010123118_AddPriorityToVersion,
Migrations.Migration_20231016141000_ConvertFunctionToProcedure
Migrations.Migration_20231016141000_ConvertFunctionToProcedure,
Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways
]
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This function augments the builtin [`session_replication_role`][1] setting to emulate its behaviour for the direct_writes mode.
--
-- With ELECTRIC_WRITE_TO_PG_MODE is set to `logical_replication`, the value of the builtin setting is looked up. On the
-- Electric->PG replication connection its value is `replica`, whereas on regular connections to Postgres it is
-- `origin`.
--
-- When ELECTRIC_WRITE_TO_PG_MODE is set to `direct_writes`, the value of the [custom `electric.session_replication_role` option][2]
-- is looked up. It is used to emulate the same Electric->PG trigger activation behaviour even though a regular connection is
-- used to apply writes to Postgres in this write mode.
--
-- [1]: https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SESSION-REPLICATION-ROLE
-- [2]: https://www.postgresql.org/docs/14/runtime-config-custom.html
CREATE OR REPLACE FUNCTION <%= @schema %>.__session_replication_role(OUT role text) AS $$
BEGIN
SELECT INTO role current_setting('electric.session_replication_role');
EXCEPTION WHEN undefined_object THEN
SELECT INTO role current_setting('session_replication_role');
Comment thread
alco marked this conversation as resolved.
END;
$$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
CREATE OR REPLACE FUNCTION <%= @schema %>.install_functions_and_triggers(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[])
RETURNS VOID
LANGUAGE PLPGSQL
AS $function$
DECLARE
shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name;
full_table_identifier TEXT := format('%I.%I', schema_name, table_name);
generated_functions JSONB;
BEGIN
-- Install function to be used in the triggers
generated_functions := <%= @schema %>.install_conflict_resolution_functions(schema_name, table_name, primary_key_list, non_pk_column_list);

-- Install actual triggers
EXECUTE format($$
CREATE OR REPLACE TRIGGER as_first__save_deleted_rows_to_tombstone_table
AFTER DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE <%= @schema %>.%I()
$$, full_table_identifier, generated_functions->>'generate_tombstone_entry');
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER as_first__save_deleted_rows_to_tombstone_table $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows
BEFORE DELETE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON <%= @schema %>.%I $$, shadow_table_name);
EXECUTE format($$
CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags
AFTER UPDATE ON <%= @schema %>.%I
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica' AND NOT NEW._resolved)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'write_correct_max_tag');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__upsert_rows
BEFORE INSERT ON <%= @schema %>.%I
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__upsert_rows $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags
BEFORE UPDATE ON <%= @schema %>.%I
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'resolve_observed_tags');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'reorder_main_op');

EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
END
$function$;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli
# This function definition is included here because it is referenced in the definition of
# "trigger_function_installers" below it.
Extension.Functions.by_name(:perform_reordered_op_installer_function),
Extension.Functions.by_name(:__session_replication_role),
@contents["trigger_function_installers"],
@contents["shadow_table_creation_and_update"]
# We need to actually run shadow table creation/updates, but that's handled in the next migration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,86 +482,3 @@ BEGIN
RETURN function_names;
END
$function$;

CREATE OR REPLACE FUNCTION electric.install_functions_and_triggers(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[])
RETURNS VOID
LANGUAGE PLPGSQL
AS $function$
DECLARE
shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name;
full_table_identifier TEXT := format('%I.%I', schema_name, table_name);
generated_functions JSONB;
BEGIN
-- Install function to be used in the triggers
generated_functions := electric.install_conflict_resolution_functions(schema_name, table_name, primary_key_list, non_pk_column_list);

-- Install actual triggers
EXECUTE format($$
CREATE OR REPLACE TRIGGER as_first__save_deleted_rows_to_tombstone_table
AFTER DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I()
$$, full_table_identifier, generated_functions->>'generate_tombstone_entry');
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER as_first__save_deleted_rows_to_tombstone_table $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows
BEFORE DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON electric.%I $$, shadow_table_name);
EXECUTE format($$
CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags
AFTER UPDATE ON electric.%I
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (NOT NEW._resolved)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'write_correct_max_tag');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__upsert_rows
BEFORE INSERT ON electric.%I
FOR EACH ROW
WHEN (pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__upsert_rows $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags
BEFORE UPDATE ON electric.%I
FOR EACH ROW
WHEN (NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'resolve_observed_tags');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (pg_trigger_depth() < 1)
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'reorder_main_op');

EXECUTE format($$ ALTER TABLE %s ENABLE REPLICA TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
END
$function$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Electric.Postgres.Extension.Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways do
alias Electric.Postgres.Extension

@behaviour Extension.Migration

sql_file =
Path.expand(
"20231206130400_convert_replica_triggers_to_always/replace_replica_triggers.sql",
__DIR__
)

@external_resource sql_file

@migration_sql File.read!(sql_file)

@impl true
def version, do: 2023_12_06_13_04_00

@impl true
def up(schema) do
[
Extension.Functions.by_name(:__session_replication_role),
String.replace(@migration_sql, "electric", schema)
]
end

@impl true
def down(_schema) do
[]
end
end
Loading