Intraoperation Parallelism


Intraoperation Parallelism is a technique used in parallel databases to speed up individual operations by distributing them across multiple processors. This type of parallelism is focused on making each operation, like a scan, join, or sort, execute faster by dividing it into smaller sub-tasks and performing these sub-tasks in parallel.

Key Concepts of Intraoperation Parallelism

  • Dividing a Single Operation into Subtasks: Intraoperation parallelism works by breaking down a single database operation into smaller, independent tasks. Each subtask handles a portion of the data and executes the same operation, independently of the others.

  • Processor Assignment: These subtasks are assigned to different processors. By working on different parts of the data simultaneously, the operation as a whole can be completed faster.

  • Data Partitioning: To enable parallel processing, data is divided into partitions, which are then distributed across processors. This partitioning can be done in several ways:

    • Range Partitioning: Data is split by ranges (e.g., rows with specific attribute values), with each processor working on a specific range.
    • Hash Partitioning: A hash function determines the partition, ensuring that rows with certain values are processed together. This method is particularly useful for operations like joins.
    • Round-Robin Partitioning: Data is evenly distributed among processors in a round-robin fashion, which can be useful for scans.
  • Execution Speedup: Since each processor operates on a subset of the data independently, the operation completes faster than if a single processor handled the entire dataset.

Parallel Sort: Range-Partitioning Sort

The Range-Partitioning Sort is a parallel sorting technique where each processor is assigned a specific range of data to sort. Here’s how it works:

  1. Processor Selection and Partition Vector Creation:

    • Select a set of processors P0, P1, ... P𝓂 (where 𝓂 ≤ 𝑛−1, with 𝑛 as the total number of processors).
    • Create a range-partition vector with 𝓂 entries. This vector defines the ranges of data each processor will handle.
  2. Redistribution Using Range Partitioning:

    • In this step, the data is redistributed based on the range-partition vector:
      • All tuples (rows) that fall within a specified range are sent to a designated processor P𝒾.
      • Each processor P𝒾, stores its assigned tuples to its local Disk, D𝒾.
      • This redistribution allows each processor to receive only the data it will sort.
  3. Local Sorting by Each Processor:

    • Each processor sorts the data in its assigned range independently. This local sorting is efficient since each processor is dealing with a smaller, defined portion of the data.
    • Since all processors perform the sorting operation in parallel, this step is completed faster than if a single processor handled all data.
  4. Final Merge Operation:

    • The final step is a simple merge. Since the data was sorted and partitioned by range, the results from each processor are already ordered relative to one another.
    • For any two processors 𝑃𝑖 and 𝑃𝑗 where 𝑖 < 𝑗 ,the maximum key in 𝑃𝑖 is less than or equal to the minimum key in 𝑃𝑗.
    • By concatenating the sorted output from each processor in order, the system achieves a fully sorted result without needing an additional merge operation.

Parallel Join

In a parallel join operation, multiple processors handle separate portions of the join workload, improving speed by dividing the pairs of tuples that need to be checked for matching join conditions. The basic steps are:

  1. Pair Testing Across Processors:
    • The join operation compares pairs of tuples from two relations (e.g., 𝑟1 and 𝑟2) to find matching pairs that satisfy the join condition.
    • Parallel join algorithms distribute these pair comparisons across several processors, allowing each processor to test different pairs independently.
  2. Local Join Computation:
    • Each processor computes part of the join locally by testing its assigned pairs and adding matching pairs to its local output.
  3. Collecting Final Results:
    • In the final step, results from each processor are collected and combined to form the complete join output.

Types of parallel join

  • Partitioned join
  • Fragment and replicate
  • Partitioned Parallel Hash-Join
  • Parallel Nested-Loop Join

Partitioned join

  • For equi-joins and natural joins
  • Equi-join: An equi-join is a type of join where the condition is based on equality (i.e., a specific column in one table matches a specific column in another table).
SELECT * FROM employee INNER JOIN department ON employee.DepartmentID = department.DepartmentID;
  • Natural join: A natural join is a type of join that automatically matches columns with the same name in both tables. It combines rows with equal values in these matching columns.
SELECT * FROM employee NATURAL JOIN department;
  • The basic steps are:
    • Partitioning the Relations:
      • Let r be the employee table and s be the department table.
      • Both r and s are divided into n partitions, resulting in subsets r0, r1, ..., rn-1 and s0, s1, ..., sn-1.
      • Each partition of r and s is created based on the DepartmentID (the join attribute), using either range partitioning or hash partitioning.
    • Range or Hash Partitioning:
      • In range partitioning, data is divided by defining a range for each partition. For example, if DepartmentIDs range from 1 to 100, the first partition could include IDs 1-25, the second 26-50, and so on.
      • In hash partitioning, a hash function is applied to the DepartmentID values to decide the partition for each row.
    • Sending Partitions to Processors:
      • After partitioning, each partition of r and s (e.g., ri and si) is assigned to a specific processor, Pi.
      • This means that Pi receives ri (a subset of employee data) and si (a subset of department data) and will work with these subsets exclusively.
    • Local Join Computation:
      • Each processor Pi performs the join operation locally on the partitions it receives, such as ri ⋈ si.
      • Standard join methods (like nested-loop join, hash join, or sort-merge join) can be used within each processor for local computation.
    • Combining Results:
      • After each processor completes the join for its partition, the results are combined across all processors to get the final joined output.

Fragment-and-Replicate Join

The Fragment-and-Replicate Join is a parallel join strategy used in databases when the more common partitioned join does not work efficiently. Specifically, fragment-and-replicate joins are beneficial for non-equi-joins or conditions where each tuple in one table might need to join with multiple tuples in another table, making it hard to achieve effective partitioning. Here’s a breakdown of how it works:

Why Fragment-and-Replicate is Needed

  • Limitations of Partitioned Join: Partitioned joins work well for equi-join conditions (e.g., r.A = s.B) because data can be easily divided by the join key. However, non-equi-join conditions, such as r.A > s.B, make partitioning ineffective since tuples in one relation might need to be compared with multiple tuples in the other relation.
  • All Tuples in r May Participate: For conditions like r.A > s.B, every tuple in r might potentially join with every tuple in s, making it impractical to partition based on specific attribute values.

How Fragment-and-Replicate Join Works

  • Partition Both Relations:
    • The relations r and s are partitioned into subsets. r into n partitions r0,r1,r2...r𝓃-1 , and s into m partitions s0,s1,s2...s𝓂-₁.
    • Any partitioning technique can be used (e.g., range or hash partitioning).
  • Processor Assignment and Replication:
    • To handle all potential join combinations, there must be m×n processors, each assigned to compute a specific partition pair. These processors are labeled as Pi,j​(e.g.,𝑃₀﹐₀,𝑃₀﹐₁,…,𝑃𝑛₋₁﹐𝑚₋₁)
    • For each partition of 𝑟, say 𝑟ᵢ it is replicated to every processor in its row.( e.g.,𝑃₀﹐𝑗 ,𝑃₁﹐𝑗 , … , 𝑃𝑛₋₁,𝑗)
  • Local Join Execution:
    • Each processor 𝑃𝑖﹐𝑗 now has access to a unique combination of partitions 𝑟𝑖 and 𝑠𝑗 and computes the join locally.
    • Any join technique (e.g., nested-loop, hash join) can be used on each processor to evaluate the join condition.

Special Case: Asymmetric Fragment-and-Replicate Join

In some cases, it is more efficient to only partition one relation and replicate the other across all processors:

  • One relation (say 𝑟) is partitioned into subsets 𝑟₀,𝑟₁,...𝑟𝑛₋₁ , and each partition is sent to a different processor.
  • The second relation 𝑠 is small enough to be replicated across all processors.
  • Each processor 𝑃𝑖, can then compute the join of its local partition of 𝑟 with the entire set of tuples in 𝑠.

Example Scenario: Asymmetric Fragment-and-Replicate

  • Suppose 𝑟 is a large table with millions of rows, already partitioned across processors.
  • Suppose 𝑠 is a small table with only a few thousand rows.
  • Instead of re-partitioning 𝑟 and 𝑠, it’s more efficient to replicate 𝑠 on each processor and let each processor compute the join of its local partition of 𝑟 with 𝑠.

Partitioned Parallel Hash-Join

The Partitioned Parallel Hash-Join is a method used in parallel database systems to efficiently join two relations, 𝑟 and 𝑠, using hashing techniques to distribute the work across multiple processors. This approach is useful for handling large datasets that need to be processed in parallel. Here's a step-by-step explanation:

  • Choosing the Build Relation
    • Determine the Smaller Relation: We first choose 𝑠 (the smaller of the two relations) as the "build" relation. This is done because it requires fewer resources to partition and distribute, which reduces data movement and memory usage.
    • Initial Hashing with h1: A hash function h1 is applied to each tuple in 𝑠 to map it to one of the processors ,say , 𝑃₀, 𝑃₁, ... 𝑃𝓂₋₁ .
  • Distributing and Storing 𝑠
    • Distribute 𝑠 Tuples: Each processor, 𝑃𝑖, receives a subset 𝑠𝑖 ​of tuples from 𝑠 based on ℎ1 and stores them locally.
    • Further Hashing with ℎ2: On each processor, the subset 𝑠𝑖 is further partitioned locally using another hash function ℎ2. This helps to organize the data for efficient hash joins within each processor.
  • Step 3: Redistributing 𝑟 Using ℎ1
    • Redistribute Relation 𝑟: Relation 𝑟 is also partitioned using the same hash function ℎ1, sending each of its tuples to the corresponding processors.
    • Repartition 𝑟𝑖​ Using ℎ2: When processor 𝑃𝑖 receives its portion of 𝑟, denoted as 𝑟𝑖, it further partitions these tuples using ℎ2, just like it did with 𝑠𝑖
  • Performing Local Hash-Joins: Each processor 𝑃𝑖 now has locally partitioned segments of both 𝑟 and 𝑠 (i.e., 𝑟𝑖 and 𝑠𝑖 ), both organized by ℎ2.
  • Merging Results
    • Independent Processing: Since each processor is performing the join independently on its assigned partitions, there’s no need for further communication between processors.
    • Final Result Assembly: The final result is obtained by combining the results from all processors.

Parallel Index Nested-Loop Join

In the Parallel Index Nested-Loop Join, the goal is to use an index on the join attribute of a large relation 𝑟 to efficiently join it with a smaller relation 𝑠 across multiple processors. Here’s the process step-by-step:

  • Assumptions:
    • The relation 𝑠 is significantly smaller than 𝑟, noted as 𝑠≪𝑟.
    • The relation 𝑟 is already partitioned across multiple processors.
    • There exists an index on the join attribute of 𝑟 that can be utilized for efficient lookups.
  • Use Asymmetric Fragment-and-Replicate:
    • Since 𝑠 is small, we replicate it across all processors that store partitions of 𝑟.
    • This technique avoids the need to repartition 𝑟 by simply sending 𝑠 to every processor that has a partition of 𝑟.
  • Replication Process:
    • For each processor 𝑃𝑗 that stores a portion 𝑠𝑗 of relation 𝑠 :
      • Pj sends a copy of 𝑠𝑗 to every other processor 𝑃𝑖 that contains a partition of 𝑟.
      • As a result, 𝑠 is fully replicated across all processors holding any part of 𝑟.
  • Indexed Nested-Loop Join:
    • Each processor 𝑃𝑖 performs an indexed nested-loop join.
    • Specifically, it joins 𝑠 with the 𝑖th partition of 𝑟 by using the index on 𝑟’s join attribute.
    • The index allows each processor to quickly find matching tuples in its local partition of 𝑟 based on the join condition with 𝑠.

In parallel databases, selection and duplicate elimination operations are often parallelized for efficiency. Here’s a breakdown of these operations:

Selection (σ_θ(r))

The selection operation involves filtering rows in a relation 𝑟 based on a condition 𝜃.

  1. Equality Condition (𝑎𝑖 = 𝑣):
    • If 𝑟 is partitioned on the attribute 𝑎𝑖, then the selection can be performed on a single processor.
    • For example, if we're selecting rows where DepartmentID = 5 and DepartmentID is partitioned across processors, only the processor containing DepartmentID = 5 data will process this query, improving efficiency.
  2. Range Condition (𝑙 ≤ 𝑎𝑖 ≤ 𝑢):
    • If 𝑟 is range-partitioned on 𝑎𝑖 , then only the processors with partitions overlapping the range 𝑙 ≤ 𝑎𝑖 ≤𝑢 need to perform the selection.
  3. General Case:
    • For conditions that don’t fit the above scenarios, selection is performed in parallel on all processors.
    • Each processor independently filters its partition of 𝑟, which can lead to high efficiency with large datasets.

Duplicate Elimination

Duplicate elimination removes identical rows in a relation, often used with projection.

  1. During Parallel Sorting: *If a parallel sort is used, duplicates can be removed as soon as they’re found in each processor’s local partition, reducing memory and processing requirements.
  2. Using Partitioning:
    • The relation 𝑟 can be partitioned (either by range or hash).
    • Each processor performs duplicate elimination locally within its partition, combining results at the end to ensure all duplicates are removed.

Projection

The projection operation creates a new relation by selecting specific columns. In parallel databases:

  • Projection is often combined with duplicate elimination, especially if removing duplicate rows is desired.
  • Each processor only needs to process the columns required for the projection, improving memory usage and processing speed.

Grouping / Aggregation

In parallel databases, grouping and aggregation operations are often divided across processors to improve efficiency. Here’s how the process typically works:

  1. Partitioning on Grouping Attributes:
    • The relation 𝑟 is partitioned across processors based on the grouping attributes.
    • For example, if you want to group employee data by DepartmentID, data is divided among processors such that each processor handles specific departments.
  2. Local Aggregation Computation:
    • Each processor 𝑃𝑖 computes the aggregate values locally on its subset of data.
    • For example, if the goal is to compute the SUM(Salary) for each department, each processor will first calculate the sum of salaries for its own departments.
  3. Reducing Transfer Costs with Partial Aggregation:
    • This local aggregation step reduces the number of tuples and the volume of data that needs to be transferred between processors.
    • After each processor 𝑃𝑖 computes the partial sum for its data, only these partial sums are sent across the network, rather than all tuples.
  4. Final Aggregation:
    • The partial results are then partitioned again based on the grouping attributes.
    • Each processor that now holds data for a specific grouping attribute (e.g., DepartmentID) performs a final aggregation (e.g., SUM) on the partial results to get the total sum.
    • This step consolidates the results, providing the final aggregated values.

Cost of parallel evaluation of operations

The cost of parallel evaluation of operations in a database system refers to the time it takes to complete an operation using multiple processors, where the goal is to achieve faster results compared to a single-processor system. Here’s a breakdown of how this cost is calculated:

  • If no skew: expected speed-up =1/n
  • If skew, the time can be estimated as Tpart + Tasm + max (T0, T1, …, Tn-1)
  • Tpart is the time for partitioning the relations
  • Tasm is the time for assembling the results
  • Ti is the time taken for the operation at processor Pi
  • decided by the slowest one, need consider many factor
All systems normal

© 2025 2023 Sanjeeb KC. All rights reserved.