Tools For Better Developers

Osm Admin: Saving Objects. Incremental Indexing. `INSERT INTO ... SELECT ...`

2022 April Osm Admin

2 years ago ∙ 3 minutes read

New month, same worries - I continued working on Osm Admin:

  • routes responsible for creating new products, and modifying existing products
  • incremental indexing
  • cloning query objects
  • INSERT INTO ... SELECT ... statements

More details:


When generating SQL, Query::generate...() methods fill in the temporary from property, and then it's converted into the FROM clause:

// Query::generateSelect()
FROM {$this->generateFrom($from)}

protected function generateFrom(array $from): string
    $sql = '';


    foreach ($from as $alias => $on) {
        if ($on !== true) { // if it's a JOIN
            $sql .= "\n    {$on}";

        // otherwise, it's the main table, or a singleton
        if ($sql) {
            $sql .= ", \nFROM ";

        $sql .= "`{$alias}`";

    return $sql;

As you can see, keys in from array are table aliases, and value is either true or a string containing JOIN clause.

For example, given the following sample from array:

$from = [
    'products' => true,
    'products__category' => "INNER JOIN `categories` AS `products__category` " . 
        "\n    ON `products__category`.`id` = `products`.`category_id`"
    'settings' => true,

Query generates the following SQL statement:

FROM `products`
INNER JOIN `categories` AS `products__category`
    ON `products__category`.`id` = `products`.`category_id`,
FROM `settings`

Generating Notification Table Join

Having this knowledge, the incremental search indexing or new objects works as follows:

// Search\Index
protected function partialReindex(): void {
    $listensTo = $this->listens_to[$this->table->name];

    // copy new entries
    $query = $this->query()->joinInsertNotifications($this);
    foreach ($query->get() as $item) {

    // delete processed insert notifications
    $notificationTable = $this->getNotificationTableName($this->table,


// Query
public function joinInsertNotifications(Indexer $indexer,
    string $identifier = 'id'): static
    return $this->joinNotifications($indexer, static::INSERTED,

public function joinNotifications(Indexer $indexer, string $event,
    string $identifier): static
    $this->notification_joins[] = [
        'identifier' => $this->parse($identifier, Formula::IDENTIFIER),
        'notification_table' => $indexer->getNotificationTableName(
            $this->table, $indexer->listens_to[$this->table->name][$event]),

    return $this;

protected function generateSelect(array &$bindings): string
    $this->generateNotificationJoins($bindings, $from);

protected function generateNotificationJoins(array &$bindings,
    array &$from): void
    foreach ($this->notification_joins as $join) {
        $table = $join['notification_table'];
        /* @var Formula\Identifier $identifier */
        $identifier = $join['identifier'];

        $from[$table] = <<<EOT
INNER JOIN `$table`
        ON `$table`.`id` = {$identifier->toSql($bindings, $from, 'INNER')}

And it works!

Here the generated SQL that selects new products:

SELECT `products`.`id` AS `id`, 
    `products`.`_data`->>'$."title"' AS `title`, 
    `products`.`_data`->>'$."color"' AS `color`
FROM `products`
    INNER JOIN `zi9__products__inserts`
        ON `zi9__products__inserts`.`id` = `products`.`id`

Triggering Incremental Indexing After Insert

osm index command works. However, running it after creating each product is not how it supposed to be. Indexing should run automatically.

I already defined the indexing behavior earlier:

Then, a separate process runs that processes records in the notification tables, updates target objects, and clears processed notification records.

The idea was that the separate process is a queue worker, and Query operation should put a queued job for it. I'll implement queues later.

For now, I'll run the indexing in the same process:

// Query
public function insert(array $data): int {
    return $this->db->transaction(function() use($data) {
        // register a callback that is executed after a successful transaction
            // successful transaction guarantees that current objects are
            // fully up-to-date (except aggregations), so it's a good time to
            // make sure that asynchronous indexing is queued, or to execute
            // it right away if queue is not configured. All types of asynchronous
            // indexing are queued/executed: regular, aggregation and search.

        return $id;

protected function updateDependentObjects(): void {

Lucky day, indeed. It works, too!


The route that handles the editing form, POST / is similar to POST /create - the one that creates a new object:

#[Ui(Admin::class), Name('POST /')]
class Edit extends Route
    public function run(): Response
        $item = json_decode($this->http->content, flags: JSON_THROW_ON_ERROR);
        if (!is_object($item)) {
            return plain_response(__("Object expected"), 500);

        $query = ui_query($this->table->name);

                'limit', 'offset', 'order', 'select')

        return json_response((object)[]);


The POST / route throws an error:

Call to undefined method Osm_Admin_Samples\Osm\Admin\Queries\Query::clone()

#0 /home/vo/projects/admin2/src/Queries/Query.php(587)

Earlier, I left a technical debt:

Note. $query->clone() and $query->into() will be implemented later.

Now it's time to return it.

The line using the undefined clone() and into() methods, looks as follows:


The idea of clone() is to take the WHERE part of the UPDATE query, which is:

UPDATE `products`
SET `_data` = JSON_SET(`_data`, '$."color"', ?)
WHERE (`products`.`id` IN (?, ?, ?)) 

Later, I may want to clone other parts of a query, too, so let's add the filters flag to the method signature:

    ->clone(where: true)

Here is the clone() method itself:

// Osm\Admin\Queries\Query
public function clone(bool $where = false): static {
    $query = static::new(['table' => $this->table]);

    if ($where) {
        foreach ($this->filters as $formula) {
            $query->filters[] = $formula->clone();

    return $query;

// Osm\Admin\Queries\Formula
public function clone(): Formula {
    $formula = static::new();

    foreach ($this as $propertyName => $value) {
        if (!($property = $this->__class->properties[$propertyName] ?? null)) {

        if (!isset($property->attributes[Serialized::class])) {

        if (!is_a($property->type, Formula::class, true)) {
            $formula->$propertyName = $value;

        if (!$property->array) {
            /* @var Formula $value */
            $formula->$propertyName = $value->clone();
            $value->parent = $formula;

        /* @var Formula[] $value */
        $formula->$propertyName = [];
        foreach ($value as $key => $item) {
            $formula->$propertyName[$key] = $item->clone();
            $item->parent = $formula;

    return $formula;


This method is for generating


First, let's reserve into() method name for mass inserting objects into object tables, and notifying indexers along the way.

Now, a method is needed for notification tables only, so let's name it accordingly:

// Osm\Admin\Queries\Query
public function intoNotificationTable(): void {
    throw new NotImplemented($this);

Inside, it should generate the SELECT as usual:

public function intoNotificationTable(string $tableName): void {
    $bindings = [];
    $sql = "INSERT IGNORE INTO `{$tableName}` (`id`)\n";
    $sql .= $this->generateSelect($bindings);

    $this->db->connection->insert($sql, $bindings));