Connectors

Connectors are adapters that let Acme read from sources and write to destinations. Each connector handles the specifics of a particular system — authentication, data formats, pagination, etc.

Available connectors

Sources

ConnectorTypeIncrementalStreaming
PostgreSQLpostgresYesYes (CDC)
MySQLmysqlYesNo
MongoDBmongodbYesYes
CSVcsvNoNo
JSONjsonNoNo
ParquetparquetNoNo
REST APIrestConfigurableNo
KafkakafkaN/AYes
Redis Streamsredis_streamN/AYes
S3s3YesNo
Google Sheetsgoogle_sheetsNoNo

Destinations

ConnectorTypeWrite modes
BigQuerybigqueryappend, replace, upsert, merge
Snowflakesnowflakeappend, replace, upsert
PostgreSQLpostgresappend, replace, upsert
S3s3append, replace
JSONjsonappend, replace
Webhookwebhookappend
Elasticsearchelasticsearchappend, upsert

Connector configuration

Each connector requires specific configuration. Here's a typical database connector:

sources:
  - type: postgres
    name: users_db
    connection: ${DATABASE_URL}
    query: "SELECT * FROM users WHERE updated_at > :last_run"
    timeout: 60
    batch_size: 10000

Connection strings

Use environment variables for secrets

Never hardcode credentials in your pipeline files. Use ${VARIABLE_NAME} to reference environment variables. See Environment Variables.

# PostgreSQL
connection: postgresql://user:pass@host:5432/dbname

# MySQL
connection: mysql://user:pass@host:3306/dbname

# MongoDB
connection: mongodb://user:pass@host:27017/dbname

Building custom connectors

If Acme doesn't support your data source, you can build a custom connector:

# connectors/my_custom_source.py
from acme.sdk import SourceConnector, Row

class MyCustomSource(SourceConnector):
    """Read data from a custom API."""

    def setup(self, config):
        self.api_url = config["api_url"]
        self.api_key = config["api_key"]

    def extract(self, last_run=None):
        response = self.http.get(
            self.api_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            params={"since": last_run}
        )
        for item in response.json()["data"]:
            yield Row(item)

    def teardown(self):
        pass

Register it in your configuration:

plugins:
  connectors:
    - path: ./connectors/my_custom_source.py
      type: my_custom_source

sources:
  - type: my_custom_source
    api_url: https://api.example.com/data
    api_key: ${MY_API_KEY}
Note

Custom connectors have full access to the Acme SDK. See the Connector API for the complete interface.

Health checks

Acme periodically checks connector health. You can see the status with:

acme connectors status
┌─────────────┬──────────┬─────────┬──────────────────┐
│ Connector   │ Type     │ Status  │ Last Check       │
├─────────────┼──────────┼─────────┼──────────────────┤
│ users_db    │ postgres │ healthy │ 2 minutes ago    │
│ analytics   │ bigquery │ healthy │ 2 minutes ago    │
│ events      │ kafka    │ warning │ 5 minutes ago    │
└─────────────┴──────────┴─────────┴──────────────────┘
Built with LogoFlowershow